gpt4 book ai didi

java - Spark /Java : Dataframe String column to Struct

转载 作者:行者123 更新时间:2023-11-30 01:53:49 25 4
gpt4 key购买 nike

我有一个这样的数据集:

+---+-------------------+-----------------------+
|id |time |range |
+---+-------------------+-----------------------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|
+---+-------------------+-----------------------+

使用架构

root
|-- id: string (nullable = true)
|-- time: string (nullable = true)
|-- range: string (nullable = true)

我想过滤时间列中的小时/分钟位于范围列中的小时/分钟之间的行。

+---+-------------------+-----------------------+-----------+
|id |time |range |between |
+---+-------------------+-----------------------+-----------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|true |
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|false |
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|false |
+---+-------------------+-----------------------+-----------+

我知道在 Scala 中我必须将范围列转换为类似的内容

array(named_struct("start", "00h00", "end", "03h00"), named_struct("start", "15h30", "end", "17h30"), named_struct("start", "21h00", "end", "23h59"))

但我还没有找到用 Java 实现这一点的方法。我该如何做到这一点,或者有更好的解决方案吗?

谢谢。

最佳答案

您可以这样做的一种方法是:

  1. 使用 Spark 的静态函数标准化您的时间。
  2. 使用 UDF(用户定义函数)检查您的值是否在范围内

使用静态函数:

df = df
.withColumn(
"date",
date_format(col("time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
.withColumn("h", hour(col("date")))
.withColumn("m", minute(col("date")))
.withColumn("s", second(col("date")))
.withColumn("event", expr("h*3600 + m*60 +s"))
.drop("date")
.drop("h")
.drop("m")
.drop("s");

如果您的数据框看起来像以前:

+---+-------------------+-----------------------+
|id |time |range |
+---+-------------------+-----------------------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|
+---+-------------------+-----------------------+

之后,它应该看起来像:

+---+-------------------+-----------------------+-----+
|id |time |range |event|
+---+-------------------+-----------------------+-----+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|18000|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|32400|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|37800|
+---+-------------------+-----------------------+-----+

使用 UDF:

df = df.withColumn("between",
callUDF("inRange", col("range"), col("event")));

结果将是:

+---+-------------------+-----------------------+-----+-------+
|id |time |range |event|between|
+---+-------------------+-----------------------+-----+-------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|18000|true |
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|32400|false |
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|37800|false |
+---+-------------------+-----------------------+-----+-------+

InRangeUdf

您的 UDF 如下所示:

package net.jgp.books.sparkInAction.ch14.lab900_in_range;

import org.apache.spark.sql.api.java.UDF2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InRangeUdf implements UDF2<String, Integer, Boolean> {
private static Logger log = LoggerFactory
.getLogger(InRangeUdf.class);

private static final long serialVersionUID = -21621751L;

@Override
public Boolean call(String range, Integer event) throws Exception {
log.debug("-> call({}, {})", range, event);
String[] ranges = range.split(";");
for (int i = 0; i < ranges.length; i++) {
log.debug("Processing range #{}: {}", i, ranges[i]);
String[] hours = ranges[i].split("-");
int start =
Integer.valueOf(hours[0].substring(0, 2)) * 3600 +
Integer.valueOf(hours[0].substring(3)) * 60;
int end =
Integer.valueOf(hours[1].substring(0, 2)) * 3600 +
Integer.valueOf(hours[1].substring(3)) * 60;
log.debug("Checking between {} and {}", start, end);
if (event >= start && event <= end) {
return true;
}
}
return false;
}

}

驱动程序代码

您的驱动程序代码将如下所示:

package net.jgp.books.sparkInAction.ch14.lab900_in_range;

import static org.apache.spark.sql.functions.*;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* Custom UDF to check if in range.
*
* @author jgp
*/
public class InCustomRangeApp {

/**
* main() is your entry point to the application.
*
* @param args
*/
public static void main(String[] args) {
InCustomRangeApp app = new InCustomRangeApp();
app.start();
}

/**
* The processing code.
*/
private void start() {
// Creates a session on a local master
SparkSession spark = SparkSession.builder()
.appName("Custom UDF to check if in range")
.master("local[*]")
.getOrCreate();
spark.udf().register(
"inRange",
new InRangeUdf(),
DataTypes.BooleanType);

Dataset<Row> df = createDataframe(spark);
df.show(false);

df = df
.withColumn(
"date",
date_format(col("time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
.withColumn("h", hour(col("date")))
.withColumn("m", minute(col("date")))
.withColumn("s", second(col("date")))
.withColumn("event", expr("h*3600 + m*60 +s"))
.drop("date")
.drop("h")
.drop("m")
.drop("s");
df.show(false);

df = df.withColumn("between",
callUDF("inRange", col("range"), col("event")));
df.show(false);
}

private static Dataset<Row> createDataframe(SparkSession spark) {
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField(
"id",
DataTypes.StringType,
false),
DataTypes.createStructField(
"time",
DataTypes.StringType,
false),
DataTypes.createStructField(
"range",
DataTypes.StringType,
false) });

List<Row> rows = new ArrayList<>();
rows.add(RowFactory.create("id1", "2019-03-11 05:00:00",
"00h00-07h30;23h30-23h59"));
rows.add(RowFactory.create("id2", "2019-03-11 09:00:00",
"00h00-07h30;23h30-23h59"));
rows.add(RowFactory.create("id3", "2019-03-11 10:30:00",
"00h00-07h30;23h30-23h59"));

return spark.createDataFrame(rows, schema);
}
}

关于java - Spark /Java : Dataframe String column to Struct,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55160571/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com