前向填充 .NET for Spark
Forward filling in .NET for Spark
我正在查看 .NET (C#) 中 Spark DataFrame
的 window 函数。
我有一个 DataFrame df
,包含年、月、日、小时、分钟、ID、类型和值列:
| 2021 | 3 | 4 | 8 | 9 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 10 | null | null | null |
| 2021 | 3 | 4 | 8 | 11 | null | null | null |
| 2021 | 3 | 4 | 8 | 12 | null | null | null |
| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |
| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |
我想根据年、月、日、小时、分钟用上一行的值填充空行 (null),如下所示:
| 2021 | 3 | 4 | 8 | 9 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 10 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 11 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 12 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |
| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |
到目前为止,我找到了在 scala 中使用 Windows 和 Lag 函数的解决方案,但我不确定如何在 C# 中执行此操作。在 scala 中,window 将被定义为:
val window = Window.orderBy("Year", "Month", "Day", "Hour", "Minute")
我想使用
添加一个新值列
var filledDataFrame = df.WithColumn("newValue", Functions.When(df["Value"].IsNull(), Functions.Lag(df["Value"], 1).Over(window)).Otherwise(df["Value"])
如何在 .NET for Spark 中定义 window 并使用 Lag 函数来前向填充空值?
要在 .NET for Apache Spark 中使用 Lag 和 Window,您已经非常接近并且需要:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Lag(df["Value"], 1).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);
这将导致:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| null|
|2021| 3| 4| 8| 12|null| null| null| null|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
但您可能需要 Last
而不是 Lag
,因为您可以跳过空值:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Last(df["Value"], true).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);
这导致:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| 380.5|
|2021| 3| 4| 8| 12|null| null| null| 380.5|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
希望对您有所帮助!
ed
(完成这项工作所需的 using 语句)
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;
我正在查看 .NET (C#) 中 Spark DataFrame
的 window 函数。
我有一个 DataFrame df
,包含年、月、日、小时、分钟、ID、类型和值列:
| 2021 | 3 | 4 | 8 | 9 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 10 | null | null | null |
| 2021 | 3 | 4 | 8 | 11 | null | null | null |
| 2021 | 3 | 4 | 8 | 12 | null | null | null |
| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |
| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |
我想根据年、月、日、小时、分钟用上一行的值填充空行 (null),如下所示:
| 2021 | 3 | 4 | 8 | 9 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 10 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 11 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 12 | 87 | Type1 | 380.5 |
| 2021 | 3 | 4 | 8 | 13 | 87 | Type1 | 0.0 |
| 2021 | 3 | 4 | 8 | 14 | 87 | Type1 | 0.0 |
到目前为止,我找到了在 scala 中使用 Windows 和 Lag 函数的解决方案,但我不确定如何在 C# 中执行此操作。在 scala 中,window 将被定义为:
val window = Window.orderBy("Year", "Month", "Day", "Hour", "Minute")
我想使用
添加一个新值列var filledDataFrame = df.WithColumn("newValue", Functions.When(df["Value"].IsNull(), Functions.Lag(df["Value"], 1).Over(window)).Otherwise(df["Value"])
如何在 .NET for Spark 中定义 window 并使用 Lag 函数来前向填充空值?
要在 .NET for Apache Spark 中使用 Lag 和 Window,您已经非常接近并且需要:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Lag(df["Value"], 1).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);
这将导致:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| null|
|2021| 3| 4| 8| 12|null| null| null| null|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
但您可能需要 Last
而不是 Lag
,因为您可以跳过空值:
var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Last(df["Value"], true).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);
这导致:
+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute| ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021| 3| 4| 8| 9| 87|Type1|380.5| 380.5|
|2021| 3| 4| 8| 10|null| null| null| 380.5|
|2021| 3| 4| 8| 11|null| null| null| 380.5|
|2021| 3| 4| 8| 12|null| null| null| 380.5|
|2021| 3| 4| 8| 13| 87|Type1| 0.0| 0.0|
|2021| 3| 4| 8| 14| 87|Type1| 0.0| 0.0|
+----+-----+---+----+------+----+-----+-----+--------+
希望对您有所帮助!
ed
(完成这项工作所需的 using 语句)
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;