使用 .NET for Spark 在数据框中高效地填充数据
Time Efficient gap filling data in dataframe using .NET for Spark
我想使用 .NET for Spark 来填补我 DataFrame
中的空白。
当前 DataFrame (rawData)
包含 reportFrom
和 reportTo
之间的分钟间隔的数据
DateTime reportFrom = new DateTime(2021, 3, 4, 0, 0, 0);
DateTime reportTo = new DateTime(2021, 3, 5, 0, 0, 0);
缺少一些间隔,我想用最后一个已知值填充它们。
+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
...
|2021| 3| 4| 22| 7| 87| Power| 0.0|
第一步(插入缺失的分钟数)后我期望的结果是:
+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 6| null| null| null|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
|2021| 3| 4| 14| 8| null| null| null|
...
|2021| 3| 4| 23| 59| null| null| null|
到目前为止,我曾经使用所有分钟创建一个新的 DataFrame
,然后在两个数据帧上执行 left outer Join
。
int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)
{
timeList.Add(reportFrom);
reportFrom = reportFrom.AddMinutes(inc);
}
var toFillTime0 = new List<object> { -1, 0, 0, 0, 0 };
var dataToFill = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime0.ToArray()) },
new StructType( //shema
new List<StructField>()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));
foreach (DateTime time in timeList)
{
var toFillTime = new List<object> { time.Year, time.Month, time.Day, time.Hour, time.Minute };
var dataToFillt = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },
new StructType( //shema
new List<StructField>()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));
dataToFill = dataToFill.Union(dataToFillt);
}
dataToFill = dataToFill.Filter("Year0 > 0");
var toFillReportDataReq = dataToFill.Join(rawData,
dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
& dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"], "left_outer");
几行toFillReportDataReq
如下所示:
|2021| 3| 4| 22| 4| 87| Power| 0.0|
|2021| 3| 4| 22| 5| 87| Power| 0.0|
|2021| 3| 4| 22| 6| 87| Power| 0.0|
|2021| 3| 4| 22| 7| 87| Power| 0.0|
|2021| 3| 4| 22| 8| null| null| null|
|2021| 3| 4| 22| 9| null| null| null|
|2021| 3| 4| 22| 10| null| null| null|
|2021| 3| 4| 22| 11| null| null| null|
|2021| 3| 4| 22| 12| null| null| null|
|2021| 3| 4| 22| 13| null| null| null|
|2021| 3| 4| 22| 14| null| null| null|
已使用 window
和 last
函数覆盖 Values
列中空值的替换。
Id
和 Type
列中的值被替换为 var id = 87
和“功率”使用
toFillReportDataReq = toFillReportDataReq.WithColumn("Id", Functions.When(toFillReportDataReq["Id"].IsNull(), id)
.Otherwise(toFillReportDataReq["Id"]))
.WithColumn("Type", Functions.When(toFillReportDataReq["Type"].IsNull(), "Power")
.Otherwise(toFillReportDataReq["Type"]));
这个方法returns我想要的结果,但是很耗时(效率低)。
我的问题如下:
- 是否有更充分的方法来创建包含指定间隔之间所有分钟数的新
DataFrame
?
- 有什么办法可以避免在这个方法中加入?
- 将 Id 列中的所有值定义为 id 并将 Type 定义为“Power”的最佳方法是什么?
谢谢!
这是我会采用的方法:
- 构建一个 DataFrame,让您想要表示的每一分钟都有一行(我使用 spark.Range 为我需要的每一分钟投影一行)
- 对于范围内的每个 ID,将开始日期增加一分钟
- 使用 left_outer 连接将日期连接到您的原始数据框,这样您就不会丢失任何行
- 然后使用 Last 来填补任何空白 - 请注意,如果您以 null 开头,则 newValue 将为 null,直到您获得一些数据
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;
namespace Whosebug
{
class Program
{
static void Main(string[] args)
{
var spark = SparkSession.Builder().GetOrCreate();
//Sample data set - we will fill in the missing minutes
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 3, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 20, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 25, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 35, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 45, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
//start and end time
var reportFrom = new DateTime(2021, 3, 4, 7, 0, 0);
var reportTo = new DateTime(2021, 3, 4, 9, 0, 0);
//convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
var unixFromTime = (reportFrom - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds;
//how many total rows do we need?
var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;
//create a dataframe with 1 row for every minute we need
var everyMinute = spark.Range((long) minutesToCreate);
//Add the reportFrom unix epoch
everyMinute = everyMinute.WithColumn("BaseTime", Functions.Lit(unixFromTime));
//add to the unix epoch, the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
everyMinute = everyMinute.WithColumn("Time",
Functions.Lit(unixFromTime)
.Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));
//convert the unix epoch to an actual timestamp and drop all the intermediate columns
everyMinute = everyMinute.WithColumn("Date",
Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");
//convert timestamp into individual columns
everyMinute = everyMinute.WithColumn("Year", Functions.Year(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Month", Functions.Month(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Day", Functions.DayOfMonth(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Hour", Functions.Hour(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Minute", Functions.Minute(Functions.Col("Date")));
//join both data frames so...
var dfAllData = everyMinute.Join(df, new List<string>() {"Year", "Month", "Day", "Hour", "Minute"}, "left_outer");
//add in data using Last
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = dfAllData.WithColumn("newValue",
Functions.When(dfAllData["Value"].IsNull(),
Functions.Last(dfAllData["Value"], true).Over(window))
.Otherwise(dfAllData["Value"]));
filledDataFrame.Show(1000, 10000);
}
}
}
编辑
我想使用 .NET for Spark 来填补我 DataFrame
中的空白。
当前 DataFrame (rawData)
包含 reportFrom
和 reportTo
DateTime reportFrom = new DateTime(2021, 3, 4, 0, 0, 0);
DateTime reportTo = new DateTime(2021, 3, 5, 0, 0, 0);
缺少一些间隔,我想用最后一个已知值填充它们。
+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
...
|2021| 3| 4| 22| 7| 87| Power| 0.0|
第一步(插入缺失的分钟数)后我期望的结果是:
+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 6| null| null| null|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
|2021| 3| 4| 14| 8| null| null| null|
...
|2021| 3| 4| 23| 59| null| null| null|
到目前为止,我曾经使用所有分钟创建一个新的 DataFrame
,然后在两个数据帧上执行 left outer Join
。
int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)
{
timeList.Add(reportFrom);
reportFrom = reportFrom.AddMinutes(inc);
}
var toFillTime0 = new List<object> { -1, 0, 0, 0, 0 };
var dataToFill = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime0.ToArray()) },
new StructType( //shema
new List<StructField>()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));
foreach (DateTime time in timeList)
{
var toFillTime = new List<object> { time.Year, time.Month, time.Day, time.Hour, time.Minute };
var dataToFillt = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },
new StructType( //shema
new List<StructField>()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));
dataToFill = dataToFill.Union(dataToFillt);
}
dataToFill = dataToFill.Filter("Year0 > 0");
var toFillReportDataReq = dataToFill.Join(rawData,
dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
& dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"], "left_outer");
几行toFillReportDataReq
如下所示:
|2021| 3| 4| 22| 4| 87| Power| 0.0|
|2021| 3| 4| 22| 5| 87| Power| 0.0|
|2021| 3| 4| 22| 6| 87| Power| 0.0|
|2021| 3| 4| 22| 7| 87| Power| 0.0|
|2021| 3| 4| 22| 8| null| null| null|
|2021| 3| 4| 22| 9| null| null| null|
|2021| 3| 4| 22| 10| null| null| null|
|2021| 3| 4| 22| 11| null| null| null|
|2021| 3| 4| 22| 12| null| null| null|
|2021| 3| 4| 22| 13| null| null| null|
|2021| 3| 4| 22| 14| null| null| null|
已使用 window
和 last
函数覆盖 Values
列中空值的替换。
Id
和 Type
列中的值被替换为 var id = 87
和“功率”使用
toFillReportDataReq = toFillReportDataReq.WithColumn("Id", Functions.When(toFillReportDataReq["Id"].IsNull(), id)
.Otherwise(toFillReportDataReq["Id"]))
.WithColumn("Type", Functions.When(toFillReportDataReq["Type"].IsNull(), "Power")
.Otherwise(toFillReportDataReq["Type"]));
这个方法returns我想要的结果,但是很耗时(效率低)。
我的问题如下:
- 是否有更充分的方法来创建包含指定间隔之间所有分钟数的新
DataFrame
? - 有什么办法可以避免在这个方法中加入?
- 将 Id 列中的所有值定义为 id 并将 Type 定义为“Power”的最佳方法是什么?
谢谢!
这是我会采用的方法:
- 构建一个 DataFrame,让您想要表示的每一分钟都有一行(我使用 spark.Range 为我需要的每一分钟投影一行)
- 对于范围内的每个 ID,将开始日期增加一分钟
- 使用 left_outer 连接将日期连接到您的原始数据框,这样您就不会丢失任何行
- 然后使用 Last 来填补任何空白 - 请注意,如果您以 null 开头,则 newValue 将为 null,直到您获得一些数据
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;
namespace Whosebug
{
class Program
{
static void Main(string[] args)
{
var spark = SparkSession.Builder().GetOrCreate();
//Sample data set - we will fill in the missing minutes
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 3, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 20, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 25, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 35, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 45, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
//start and end time
var reportFrom = new DateTime(2021, 3, 4, 7, 0, 0);
var reportTo = new DateTime(2021, 3, 4, 9, 0, 0);
//convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
var unixFromTime = (reportFrom - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds;
//how many total rows do we need?
var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;
//create a dataframe with 1 row for every minute we need
var everyMinute = spark.Range((long) minutesToCreate);
//Add the reportFrom unix epoch
everyMinute = everyMinute.WithColumn("BaseTime", Functions.Lit(unixFromTime));
//add to the unix epoch, the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
everyMinute = everyMinute.WithColumn("Time",
Functions.Lit(unixFromTime)
.Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));
//convert the unix epoch to an actual timestamp and drop all the intermediate columns
everyMinute = everyMinute.WithColumn("Date",
Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");
//convert timestamp into individual columns
everyMinute = everyMinute.WithColumn("Year", Functions.Year(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Month", Functions.Month(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Day", Functions.DayOfMonth(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Hour", Functions.Hour(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Minute", Functions.Minute(Functions.Col("Date")));
//join both data frames so...
var dfAllData = everyMinute.Join(df, new List<string>() {"Year", "Month", "Day", "Hour", "Minute"}, "left_outer");
//add in data using Last
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = dfAllData.WithColumn("newValue",
Functions.When(dfAllData["Value"].IsNull(),
Functions.Last(dfAllData["Value"], true).Over(window))
.Otherwise(dfAllData["Value"]));
filledDataFrame.Show(1000, 10000);
}
}
}
编辑