Pentaho 中缺失值的插值

Interpolation of missing values in Pentaho

我正在尝试使用 Pentaho pdi 填充缺失值。

输入:

期望的输出:

目前只找到 Filling data gaps in a stream in Pentaho Data Integration, is it possible? 但它会填上最后一个已知值。

可能,我认为我可以使用上述解决方案,我还将下一个金额以及下一个日期添加到分析查询中。然后,我在克隆步骤中添加了标志,并将输入的原始结果过滤到 Dummy 并将生成的结果(从计算器)过滤到计算器(此时)。然后,可能地,我可以将该单独的流转储到数据库中的临时 table 和 运行 将执行滚动减法的 sql 查询。我也在研究 javascript 步骤。

我忽略了 Python 或 R Executor 步骤,因为最后我将 运行 在 aws vm 上完成作业,我已经预见到安装过程中会遇到的痛苦。

你有什么建议?有没有简单的插值方法?

已更新问题

你的 link 中提供的方法在我的测试中确实有效,(不过我使用 LAG 而不是 LEAD 来完成你的任务)。在这里,我不想复制该方法,只是通过使用 JavaScript 构建您也可以扩展到其他应用程序的逻辑的另一种选择:

在下面的测试中(在 PDI-8.0 上测试),转换有 5 个步骤,见下文

  1. Data Grid 创建包含三个字段的测试数据的步骤:dateaccount numberamount
  2. 行排序 根据account numberdate 对行进行排序。这是 Analytic Query 步骤所必需的,如果您的源数据已经排序,则跳过此步骤
  3. Analytic Query步骤,见下文,再创建两个字段:prev_dateprev_amount

  1. 修改JavaScript Value步骤,添加如下代码,此步骤无需配置:

    var days_diff = dateDiff(prev_date, date, "d")
    
    if (days_diff > 0) {
        /* retrieve index for two fields: 'date', 'amount' 
         * and modify their values accordingly
         */
        var idx_date = getInputRowMeta().indexOfValue("date")
        var idx_amount = getInputRowMeta().indexOfValue("amount")
    
        /* amount to increment by each row */
        var delta_amount = (amount - prev_amount)/days_diff
    
        for (var i = 1; i < days_diff; i++) {
            newRow = createRowCopy(getOutputRowMeta().size());
            newRow[idx_date]   = dateAdd(prev_date, "d", i);
            newRow[idx_amount] = prev_amount + delta_amount * i;
            putRow(newRow);
        } 
    }
    
  2. Select values 步骤删除不需要的字段,即:prev_date、prev_amount

运行 转换,您将在 Modified Java Script Value 步骤的 Preview data 选项卡下显示以下内容:

更新:

根据您的评论,您可以执行以下操作,假设您有一个新字段 account_type

  1. Analytic Query步骤中,添加一个新字段prev_account_type,类似于另外两个prev_字段,只是来自不同的Subject : account_type

  2. Modified Java Script Value步骤,需要检索account_type的Row index,修改计算逻辑delta_amount,所以当prev_account_type与当前的account_type不同时,delta_amount为0,见下面代码:

    var days_diff = dateDiff(prev_date, date, "d")
    
    if (days_diff > 0) {
        /* retrieve index for three fields: 'date', 'amount', 'account_type' */
        var idx_date     = getInputRowMeta().indexOfValue("date")
        var idx_amount   = getInputRowMeta().indexOfValue("amount")
        var idx_act_type = getInputRowMeta().indexOfValue("account_type")
    
        /* amount to increment by each row */
        var delta_amount = prev_account_type.equals(account_type) ? (amount - prev_amount)/days_diff : 0;
    
        /* copy the current Row into newRow and modify fields accordingly */
        for (var i = 1; i < days_diff; i++) {
            newRow = createRowCopy(getOutputRowMeta().size());
            newRow[idx_date]     = dateAdd(prev_date, "d", i);
            newRow[idx_amount]   = prev_amount + delta_amount * i;
            newRow[idx_act_type] = prev_account_type;
            putRow(newRow);
        } 
    }
    

注意: 调用 Java 脚本解释器确实会影响性能,所以如果这对您很重要,请坚持使用 link 中的方法提供。