使用 .NET for Spark 在数据框中高效地填充数据

Time Efficient gap filling data in dataframe using .NET for Spark

我想使用 .NET for Spark 来填补我 DataFrame 中的空白。

当前 DataFrame (rawData) 包含 reportFromreportTo

之间的分钟间隔的数据
DateTime reportFrom = new DateTime(2021, 3, 4, 0, 0, 0);
DateTime reportTo = new DateTime(2021, 3, 5, 0, 0, 0);

缺少一些间隔,我想用最后一个已知值填充它们。

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
...
|2021|    3|  4|  22|     7|                87|               Power|               0.0|

第一步(插入缺失的分钟数)后我​​期望的结果是:

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     6|              null|                null|              null|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
|2021|    3|  4|  14|     8|              null|                null|              null|
...
|2021|    3|  4|  23|    59|              null|                null|              null|               

到目前为止,我曾经使用所有分钟创建一个新的 DataFrame,然后在两个数据帧上执行 left outer Join

int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)
{
    timeList.Add(reportFrom);
    reportFrom = reportFrom.AddMinutes(inc);
}    

var toFillTime0 = new List<object> { -1, 0, 0, 0, 0 };

var dataToFill = spark.CreateDataFrame(
    new List<GenericRow> { new GenericRow(toFillTime0.ToArray()) },
    new StructType(                     //shema
    new List<StructField>()
    {
            new StructField("Year0", new IntegerType()),
            new StructField("Month0", new IntegerType()),
            new StructField("Day0", new IntegerType()),
            new StructField("Hour0", new IntegerType()),
            new StructField("Minute0", new IntegerType()),
    }));

foreach (DateTime time in timeList)
{

    var toFillTime = new List<object> { time.Year, time.Month, time.Day, time.Hour, time.Minute };

    var dataToFillt = spark.CreateDataFrame(
        new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },
        new StructType(                     //shema
        new List<StructField>()
        {
            new StructField("Year0", new IntegerType()),
            new StructField("Month0", new IntegerType()),
            new StructField("Day0", new IntegerType()),
            new StructField("Hour0", new IntegerType()),
            new StructField("Minute0", new IntegerType()),
        }));

    dataToFill = dataToFill.Union(dataToFillt);

}

dataToFill = dataToFill.Filter("Year0 > 0");    

var toFillReportDataReq = dataToFill.Join(rawData,
                dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
                & dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"], "left_outer");    

几行toFillReportDataReq如下所示:

|2021|    3|  4|  22|     4|                87|               Power|               0.0|
|2021|    3|  4|  22|     5|                87|               Power|               0.0|
|2021|    3|  4|  22|     6|                87|               Power|               0.0|
|2021|    3|  4|  22|     7|                87|               Power|               0.0|
|2021|    3|  4|  22|     8|              null|                null|              null|
|2021|    3|  4|  22|     9|              null|                null|              null|
|2021|    3|  4|  22|    10|              null|                null|              null|
|2021|    3|  4|  22|    11|              null|                null|              null|
|2021|    3|  4|  22|    12|              null|                null|              null|
|2021|    3|  4|  22|    13|              null|                null|              null|
|2021|    3|  4|  22|    14|              null|                null|              null|

已使用 windowlast 函数覆盖 Values 列中空值的替换。

IdType 列中的值被替换为 var id = 87 和“功率”使用

toFillReportDataReq = toFillReportDataReq.WithColumn("Id", Functions.When(toFillReportDataReq["Id"].IsNull(), id)
   .Otherwise(toFillReportDataReq["Id"]))
   .WithColumn("Type", Functions.When(toFillReportDataReq["Type"].IsNull(), "Power")
    .Otherwise(toFillReportDataReq["Type"]));

这个方法returns我想要的结果,但是很耗时(效率低)。

我的问题如下:

谢谢!

这是我会采用的方法:

  1. 构建一个 DataFrame,让您想要表示的每一分钟都有一行(我使用 spark.Range 为我需要的每一分钟投影一行)
  2. 对于范围内的每个 ID,将开始日期增加一分钟
  3. 使用 left_outer 连接将日期连接到您的原始数据框,这样您就不会丢失任何行
  4. 然后使用 Last 来填补任何空白 - 请注意,如果您以 null 开头,则 newValue 将为 null,直到您获得一些数据
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

namespace Whosebug
{
    class Program
    {
        static void Main(string[] args)
        {
            var spark = SparkSession.Builder().GetOrCreate();
            
            //Sample data set - we will fill in the missing minutes
            var df = spark.CreateDataFrame(new List<GenericRow>()
            {
                new GenericRow(new object[] {2021, 3, 4, 8, 3, 87, "Type1", 380.5}),
                new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
                new GenericRow(new object[] {2021, 3, 4, 8, 20, null, null, null}),
                new GenericRow(new object[] {2021, 3, 4, 8, 25, null, null, null}),
                new GenericRow(new object[] {2021, 3, 4, 8, 35, 87, "Type1", 0.0}),
                new GenericRow(new object[] {2021, 3, 4, 8, 45, 87, "Type1", 0.0})
            }, new StructType(new List<StructField>()
            {
                new StructField("Year", new IntegerType()),
                new StructField("Month", new IntegerType()),
                new StructField("Day", new IntegerType()),
                new StructField("Hour", new IntegerType()),
                new StructField("Minute", new IntegerType()),
                new StructField("ID", new IntegerType()),
                new StructField("Type", new StringType()),
                new StructField("Value", new DoubleType()),
            }));
            
            //start and end time
            var reportFrom = new DateTime(2021, 3, 4, 7, 0, 0);
            var reportTo = new DateTime(2021, 3, 4, 9, 0, 0);
            
            //convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
            var unixFromTime = (reportFrom - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds;
            
            //how many total rows do we need?
            var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;
            
            //create a dataframe with 1 row for every minute we need
            var everyMinute = spark.Range((long) minutesToCreate);
            
            //Add the reportFrom unix epoch
            everyMinute = everyMinute.WithColumn("BaseTime", Functions.Lit(unixFromTime));
            
            //add to the unix epoch, the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
            everyMinute = everyMinute.WithColumn("Time",
                Functions.Lit(unixFromTime)
                    .Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));
            
            //convert the unix epoch to an actual timestamp and drop all the intermediate columns
            everyMinute = everyMinute.WithColumn("Date",
                Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");
                
            //convert timestamp into individual columns

            everyMinute = everyMinute.WithColumn("Year", Functions.Year(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Month", Functions.Month(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Day", Functions.DayOfMonth(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Hour", Functions.Hour(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Minute", Functions.Minute(Functions.Col("Date")));

            //join both data frames so...
            var dfAllData = everyMinute.Join(df, new List<string>() {"Year", "Month", "Day", "Hour", "Minute"}, "left_outer");
            
            //add in data using Last
            var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
            var filledDataFrame = dfAllData.WithColumn("newValue",
                Functions.When(dfAllData["Value"].IsNull(),
                        Functions.Last(dfAllData["Value"], true).Over(window))
                    .Otherwise(dfAllData["Value"]));

            filledDataFrame.Show(1000, 10000);
        }
    }
}

编辑