gpt4 book ai didi

c# - 前向填充 .NET for Spark

转载 作者:行者123 更新时间:2023-12-04 03:36:41 25 4
gpt4 key购买 nike

我正在查看 .NET (C#) 中 Spark DataFrame 的窗口函数。

我有一个包含年、月、日、小时、分钟、ID、类型和值列的 DataFrame df:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 | 3 | 4 | 8 | 10 | null | null | null |

| 2021 | 3 | 4 | 8 | 11 | null | null | null |

| 2021 | 3 | 4 | 8 | 12 | null | null | null |

| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |

| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |

我想根据年、月、日、小时、分钟用上一行的值填充空行 (null),如下所示:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 | 3 | 4 | 8 | 10 | 87 | Type1 | 380.5 |

| 2021 | 3 | 4 | 8 | 11 | 87 | Type1 | 380.5 |

| 2021 | 3 | 4 | 8 | 12 | 87 | Type1 | 380.5 |

| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |

| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |

到目前为止,我找到了在 scala 中使用 Windows 和 Lag 函数的解决方案,但我不确定如何在 C# 中执行此操作。在 scala 中,窗口将被定义为:

val window = Window.orderBy("年", "月", "日", "时", "分")

我想使用

添加一个 newValue 列

var filledDataFrame = df.WithColumn("newValue", Functions.When(df["Value"].IsNull(), Functions.Lag(df["Value"], 1).Over(window) ).否则(df["值"])

如何在 .NET 中为 Spark 定义一个窗口并使用 Lag 函数来前向填充空值?

最佳答案

要在 .NET for Apache Spark 中使用 Lag 和 Window,您已经非常接近并且需要:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),

}));

var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Lag(df["Value"], 1).Over(window))
.Otherwise(df["Value"]));

filledDataFrame.Show(1000, 10000);

这会导致:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| null|
|2021| 3| 4| 8| 12|null| null| null| null|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+

但您可能需要 Last 而不是 Lag,因为您可以跳过空值:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),

}));

var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Last(df["Value"], true).Over(window))
.Otherwise(df["Value"]));

filledDataFrame.Show(1000, 10000);

结果是:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| 380.5|
|2021| 3| 4| 8| 12|null| null| null| 380.5|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+

希望对您有所帮助!

编辑

(完成这项工作所需的 using 语句)

using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

关于c# - 前向填充 .NET for Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66764291/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com