gpt4 book ai didi

java - 如何向Dataframe添加一些信息?

转载 作者:行者123 更新时间:2023-12-01 06:02:20 24 4
gpt4 key购买 nike

我有非常简单的 DataFrame。

val df = Seq(
("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df.show()

在我的 Scala 项目中,我将此 DataFrame 转换为 CSV 文件。我需要在开头添加一些信息,如您在示例中看到的:

| REQUEST_DATE | 2019-02-05 20:00:00 |
| USER | Kate |
| SEARCH_TYPE | Global |

| NAME | START_DATE | END_DATE | STATUS |
| Alex | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT |
| Bob | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN |
| Mark | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN |
| Mark | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT |
| Meggy | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT |

我尝试创建新的 DataFrame 并将它们连接在一起。不幸的是,您无法连接 2 个具有不同架构的 DataFrame。

最佳答案

假设您不想在将文件写入磁盘后执行此操作,您可以:

1.将两个数据框中的所有内容都转换为字符串。但是,输出将如下所示:

 | REQUEST_DATE | 2019-02-05 20:00:00 |                     |        |
| USER | Kate | | |
| SEARCH_TYPE | Global | | |
| | | | |
| NAME | START_DATE | END_DATE | STATUS |
| Alex | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT |
| Bob | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN |
| Mark | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN |
| Mark | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT |
| Meggy | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT |
  • 构建您的客户输出编写器,在保存之前添加 header 。您可以找到更多信息there - 查找保存/写入部分。
  • 更新

    如果你想做#1,这里是转换第一个数据帧(带有te数据)的代码:

    Dataset<Row> transitionDf = dataDf
    .withColumn("_c1", dataDf.col("NAME"))
    .withColumn("_c2",
    dataDf.col("START_DATE").cast(DataTypes.StringType))
    .withColumn("_c3",
    dataDf.col("END_DATE").cast(DataTypes.StringType))
    .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
    .drop("NAME")
    .drop("START_DATE")
    .drop("END_DATE")
    .drop("STATUS");

    关键是cast()你的列,这样你就可以使用unionByName()来合并两个数据帧。 Java 中的整个代码(我不使用 Scala)将是这样的:

    package net.jgp.labs.spark.l901Union;

    import java.sql.Date;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.List;

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;

    /**
    * Use of unionByName() to create a complex header on a dataframe.
    *
    * @author jgp
    */
    public class UnionApp {
    private SimpleDateFormat format = null;

    public UnionApp() {
    this.format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }

    /**
    * main() is your entry point to the application.
    *
    * @param args
    * @throws ParseException
    */
    public static void main(String[] args) throws ParseException {
    UnionApp app = new UnionApp();
    app.start();
    }

    /**
    * The processing code.
    *
    * @throws ParseException
    */
    private void start() throws ParseException {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
    .appName("expr()")
    .master("local")
    .getOrCreate();

    // DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
    // HH:mm:ss", Locale.ENGLISH);

    // Data
    StructType dataSchema = DataTypes.createStructType(new StructField[] {
    DataTypes.createStructField(
    "NAME",
    DataTypes.StringType,
    false),
    DataTypes.createStructField(
    "START_DATE",
    DataTypes.DateType,
    false),
    DataTypes.createStructField(
    "END_DATE",
    DataTypes.DateType,
    false),
    DataTypes.createStructField(
    "STATUS",
    DataTypes.StringType,
    false) });
    List<Row> dataRows = new ArrayList<Row>();
    dataRows.add(RowFactory.create("Alex", toDate("2018-01-01 00:00:00"),
    toDate("2018-02-01 00:00:00"), "OUT"));
    dataRows.add(RowFactory.create("Bob", toDate("2018-02-01 00:00:00"),
    toDate("2018-02-05 00:00:00"), "IN"));
    dataRows.add(RowFactory.create("Mark", toDate("2018-02-01 00:00:00"),
    toDate("2018-03-01 00:00:00"), "IN"));
    dataRows.add(RowFactory.create("Mark", toDate("2018-05-01 00:00:00"),
    toDate("2018-08-01 00:00:00"), "OUT"));
    dataRows.add(RowFactory.create("Meggy", toDate("2018-02-01 00:00:00"),
    toDate("2018-02-01 00:00:00"), "OUT"));
    Dataset<Row> dataDf = spark.createDataFrame(dataRows, dataSchema);
    dataDf.show();
    dataDf.printSchema();

    // Header
    StructType headerSchema = DataTypes.createStructType(new StructField[] {
    DataTypes.createStructField(
    "_c1",
    DataTypes.StringType,
    false),
    DataTypes.createStructField(
    "_c2",
    DataTypes.StringType,
    false),
    DataTypes.createStructField(
    "_c3",
    DataTypes.StringType,
    false),
    DataTypes.createStructField(
    "_c4",
    DataTypes.StringType,
    false) });
    List<Row> headerRows = new ArrayList<Row>();
    headerRows.add(RowFactory.create("REQUEST_DATE",
    format.format(new java.util.Date()), "", ""));
    headerRows.add(RowFactory.create("USER", "Kate", "", ""));
    headerRows.add(RowFactory.create("SEARCH_TYPE", "Global", "", ""));
    headerRows.add(RowFactory.create("", "", "", ""));
    headerRows
    .add(RowFactory.create("NAME", "START_DATE", "END_DATE", "STATUS"));
    Dataset<Row> headerDf = spark.createDataFrame(headerRows, headerSchema);
    headerDf.show(false);
    headerDf.printSchema();

    // Transition
    Dataset<Row> transitionDf = dataDf
    .withColumn("_c1", dataDf.col("NAME"))
    .withColumn("_c2",
    dataDf.col("START_DATE").cast(DataTypes.StringType))
    .withColumn("_c3",
    dataDf.col("END_DATE").cast(DataTypes.StringType))
    .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
    .drop("NAME")
    .drop("START_DATE")
    .drop("END_DATE")
    .drop("STATUS");
    transitionDf.show(false);
    transitionDf.printSchema();

    // Union
    Dataset<Row> unionDf = headerDf.unionByName(transitionDf);
    unionDf.show(false);
    unionDf.printSchema();
    }

    private Date toDate(String dateAsText) throws ParseException {
    java.util.Date parsed;
    parsed = format.parse(dateAsText);
    return new Date(parsed.getTime());
    }
    }

    我将其保存为我的 Spark and Java labs and GitHub 的一部分。等效的 Scala 代码可能会更紧凑一点:)。

    关于java - 如何向Dataframe添加一些信息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54525966/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com