在 Data Fusion 中应用 Rank 或 partitioned row_num 函数

Apply Rank or partitioned row_num function in Data Fusion

我想在 Data Fusion 中对我的数据实施排名或分区 row_num 功能,但我找不到任何插件可以这样做。

有什么办法可以得到这个吗?

我想实现下面的,

假设我有上面的数据,现在我想根据 AccountNumber 对数据进行分组,并将最新的记录发送到一个接收器中,其余的发送给其他接收器。 所以从上面的数据来看,

Sink1 预计有,

接收器 2 ,

我正计划通过按 AccountNumber 应用排名或 row_number 分区并按 Record_date desc 之类的功能进行排序并发送具有 rank=1 或 [=34= 的记录来进行此隔离]=1 到一个水槽,休息到另一个。

解决您的问题的一个好方法是使用 Spark plugin。 为了将它添加到您的 Datafusion 实例,转到 HUB -> Plugins -> Search for Spark -> Deploy the plugin 。然后你可以在 Analytics 选项卡。

为了举例说明如何使用它,我在下面创建了管道:

这条管道基本上是:

  1. 从 GCS 读取文件。
  2. 在您的数据中执行排名函数
  3. 在不同的分支中过滤rank=1和rank>1的数据
  4. 将您的数据保存在不同的位置

现在让我们更深入地了解每个组件:

1 - GCS:这是一个简单的 GCS 源。用于此示例的文件具有如下所示的数据

2 - Spark_rank:这是一个 Spark 插件,代码如下。该代码基本上使用您的数据创建了一个临时视图,并且它们应用查询来对您的行进行排名。之后,您的数据返回到管道。您还可以在下面看到此步骤的输入和输出数据。请注意输出是重复的,因为它被传送到两个分支。

      def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
          df.createTempView("source")
          df.sparkSession.sql("SELECT AccountNumber, Address, Record_date, RANK() OVER (PARTITION BY accountNumber ORDER BY record_date DESC) as rank FROM source")
    }

3 - Spark2Spark3:和下面的步骤一样,这一步使用Spark插件来转换数据。 Spark2 使用下面的代码只获取 rank = 1 的数据

    def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
      df.createTempView("source_0")
      df.sparkSession.sql("SELECT AccountNumber, Address, Record_date FROM 
    source_0 WHERE rank = 1")
    }

Spark3 使用以下代码获取排名 > 1 的数据:

    def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
      df.createTempView("source_1")
      df.sparkSession.sql("SELECT accountNumber, address, record_date FROM source_1 WHERE rank > 1")
    }

4 - GCS2GCS3:最后,在这一步中,您的数据将再次保存到 GCS 中。