如何旋转流数据集?

How to pivot streaming dataset?

我正在尝试旋转 Spark 流数据集(结构化流),但我得到一个 AnalysisException(以下摘录)。

有人可以确认在结构化流 (Spark 2.0) 中确实不支持旋转,或许可以建议其他方法吗?

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch.apply(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

tl;dr pivot Spark Structured Streaming 不直接支持聚合,包括 2.4.4

作为解决方法,使用 DataStreamWriter.foreachBatch or more generic DataStreamWriter.foreach


我目前使用的是最新版本的 Spark 2.4.4。

scala> spark.version
res0: String = 2.4.4

UnsupportedOperationChecker(您可以在堆栈跟踪中找到)检查流式查询(的逻辑计划)是否仅使用支持的操作。

当您执行 pivot 时,您必须先 groupBy,因为这是唯一可以为您提供 pivot 可用的界面。

pivot 有两个问题:

  1. pivot 想知道要为多少列生成值,因此 collect 这对于流式数据集是不可能的。

  2. pivot实际上是Spark Structured Streaming不支持的另一种聚合(在groupBy旁边)

让我们看一下问题 1,其中没有可定义的列。

val sq = spark
  .readStream
  .format("rate")
  .load
  .groupBy("value")
  .pivot("timestamp") // <-- pivot with no values
  .count
  .writeStream
  .format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch(UnsupportedOperationChecker.scala:38)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$adapted(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
  at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
  ... 49 elided

最后两行显示了问题,即 pivot does collect 在幕后,因此是问题。

另一个问题是,即使您指定了要旋转的列的值,您也会由于 multiple aggregations (and you can see that it's actually a check for streaming not batch 而遇到另一个问题,就像第一种情况一样)。

val sq = spark
  .readStream
  .format("rate")
  .load
  .groupBy("value")
  .pivot("timestamp", Seq(1)) // <-- pivot with explicit values
  .count
  .writeStream
  .format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
   +- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
      +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
  ... 49 elided

这是一个简单的 Java 示例,基于上面 Jacek 的回答:

JSON数组:

[{
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-01"
    },
    {
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-01"
    },
    {
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-02"
    },
    {
        "customer_id": "d6315a00",
        "product": "Food widget",
        "price": 4,
        "bought_date": "2019-08-20"
    },
    {
        "customer_id": "d6315cd0",
        "product": "Food widget",
        "price": 4,
        "bought_date": "2019-09-19"
    }, {
        "customer_id": "d6315e2e",
        "product": "Bike widget",
        "price": 10,
        "bought_date": "2019-01-01"
    }, {
        "customer_id": "d6315a00",
        "product": "Bike widget",
        "price": 10,
        "bought_date": "2019-03-10"
    },
    {
        "customer_id": "d631614e",
        "product": "Garage widget",
        "price": 4,
        "bought_date": "2019-02-15"
    }
]

Java代码:

package io.centilliard;

import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.from_json;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Function2;
import scala.runtime.BoxedUnit;

public class Pivot {

    public static void main(String[] args) throws StreamingQueryException, AnalysisException {

        StructType schema = new StructType(new StructField[]{
                new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                new StructField("bought_date", DataTypes.StringType, false, Metadata.empty())
            });

        ArrayType  arrayType = new ArrayType(schema, false);

        SparkSession spark = SparkSession
                .builder()
                .appName("SimpleExample")
                .getOrCreate();

        // Create a DataSet representing the stream of input lines from Kafka
        Dataset<Row> dataset = spark
                        .readStream()
                        .format("kafka")                
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", "utilization")
                        .load()
                        .selectExpr("CAST(value AS STRING) as json");

        Column col = new Column("json");        
        Column data = from_json(col,arrayType).as("data");  
        Column explode = explode(data);
        Dataset<Row> customers = dataset.select(explode).select("col.*");

        DataStreamWriter<Row> dataStreamWriter = new DataStreamWriter<Row>(customers);

        StreamingQuery dataStream = dataStreamWriter.foreachBatch(new Function2<Dataset<Row>, Object, BoxedUnit>() {

            @Override
            public BoxedUnit apply(Dataset<Row> dataset, Object object) {               

                dataset
                .groupBy("customer_id","product","bought_date")
                .pivot("product")               
                .sum("price")               
                .orderBy("customer_id")
                .show();

                return null;
            }
        })
        .start();

        dataStream.awaitTermination();
    }

}

输出:

+-----------+-------------+-----------+-----------+-----------+-------------+------------+
|customer_id|      product|bought_date|Bike widget|Food widget|Garage widget|Super widget|
+-----------+-------------+-----------+-----------+-----------+-------------+------------+
|   d6315a00|  Bike widget| 2019-03-10|         20|       null|         null|        null|
|   d6315a00| Super widget| 2019-01-02|       null|       null|         null|          20|
|   d6315a00| Super widget| 2019-01-01|       null|       null|         null|          40|
|   d6315a00|  Food widget| 2019-08-20|       null|          8|         null|        null|
|   d6315cd0|  Food widget| 2019-09-19|       null|          8|         null|        null|
|   d6315e2e|  Bike widget| 2019-01-01|         20|       null|         null|        null|
|   d631614e|Garage widget| 2019-02-15|       null|       null|            8|        null|
+-----------+-------------+-----------+-----------+-----------+-------------+------------+

在大多数情况下,您可以使用条件聚合作为解决方法。 相当于

df.groupBy("timestamp").
   pivot("name", Seq("banana", "peach")).
   sum("value")

df.filter($"name".isin(Seq("banana", "peach"):_*)).
   groupBy("timestamp").
   agg(
     sum(when($"name".equalTo("banana"), $"value").
         otherwise("null")).
         cast(IntegerType).alias("banana"),
     sum(when($"name".equalTo("peach"), $"value").
         otherwise("null")).
         cast(IntegerType).alias("peach")
   )