gpt4 book ai didi

apache-spark - 根据Spark中的列值拆分数据集

转载 作者:行者123 更新时间:2023-12-04 04:40:25 24 4
gpt4 key购买 nike

我正在尝试根据制造商列内容将数据集拆分为不同的数据集。它非常慢,请提出一种改进代码的方法,以便可以更快地执行并减少Java代码的使用。

    List<Row> lsts= countsByAge.collectAsList();

for(Row lst:lsts){
String man=lst.toString();
man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
Dataset<Row> DF = src.filter("Manufacturer='"+man+"'");
DF.show();

}

代码,输入和输出数据集如下所示。
    package org.sparkexample;
import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
public class GroupBy {

public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "C:\\winutils");
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate();
sc.setLogLevel("ERROR");

Dataset<Row> src= sqlContext.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.load("sample.csv");


Dataset<Row> unq_manf=src.select("Manufacturer").distinct();
List<Row> lsts= unq_manf.collectAsList();

for(Row lst:lsts){
String man=lst.toString();
man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
Dataset<Row> DF = src.filter("Manufacturer='"+man+"'");
DF.show();

}
}
}

INPUT TABLE-
+------+------------+--------------------+---+
|ItemID|Manufacturer| Category name|UPC|
+------+------------+--------------------+---+
| 804| ael|Brush & Broom Han...|123|
| 805| ael|Wheel Brush Parts...|124|
| 813| ael| Drivers Gloves|125|
| 632| west| Pipe Wrenches|126|
| 804| bil| Masonry Brushes|127|
| 497| west| Power Tools Other|128|
| 496| west| Power Tools Other|129|
| 495| bil| Hole Saws|130|
| 499| bil| Battery Chargers|131|
| 497| west| Power Tools Other|132|
+------+------------+--------------------+---+

OUTPUT-
+------------+
|Manufacturer|
+------------+
| ael|
| west|
| bil|
+------------+

+------+------------+--------------------+---+
|ItemID|Manufacturer| Category name|UPC|
+------+------------+--------------------+---+
| 804| ael|Brush & Broom Han...|123|
| 805| ael|Wheel Brush Parts...|124|
| 813| ael| Drivers Gloves|125|
+------+------------+--------------------+---+

+------+------------+-----------------+---+
|ItemID|Manufacturer| Category name|UPC|
+------+------------+-----------------+---+
| 632| west| Pipe Wrenches|126|
| 497| west|Power Tools Other|128|
| 496| west|Power Tools Other|129|
| 497| west|Power Tools Other|132|
+------+------------+-----------------+---+

+------+------------+----------------+---+
|ItemID|Manufacturer| Category name|UPC|
+------+------------+----------------+---+
| 804| bil| Masonry Brushes|127|
| 495| bil| Hole Saws|130|
| 499| bil|Battery Chargers|131|
+------+------------+----------------+---+

谢谢你

最佳答案

在这种情况下,您有两种选择:

  • 首先,您必须收集唯一的制造商值,然后进行映射
    超出结果数组:

    val df = Seq(("HP", 1), ("Brother", 2), ("Canon", 3), ("HP", 5)).toDF("k", "v")    
    val brands = df.select("k").distinct.collect.flatMap(_.toSeq)
    val BrandArray = brands.map(brand => df.where($"k" <=> brand))
    BrandArray.foreach { x =>
    x.show()
    println("---------------------------------------")
    }
  • 您还可以根据制造商保存数据框。
    df.write.partitionBy("hour").saveAsTable("parquet")
  • 关于apache-spark - 根据Spark中的列值拆分数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42645836/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com