使用 spark sql 从前 3 个分区获取数据

Get data from first 3 partitions using spark sql

我有以下 table 并且想使用 scala spark

获得前 n 个唯一名称以及其余列
+--------------------+--------------------+--------+----------+
|            Name.   |               Type |cs_Units|cs1_Units |
+--------------------+--------------------+--------+----------+
|AUTO.,AUTO.ACCESS...|BACHMAN-BERNARD C...|       4|      8|
|AUTO.,AUTO.ACCESS...|CAVENAUGHS BRUCE ...|       1|       |
|AUTO.,AUTO.ACCESS...|SCOTT CHANTZ KIA ...|       2|       |
|BUSINESS & CONSUM...|WILLIAMS JIM & AS...|      11|      8|
|BUSINESS & CONSUM...|OBRIEN SVC CO HEA...|       6|      9|
|BUSINESS & CONSUM...|TOUCHSTONE ENERGY...|       5|      5|
|BUSINESS & CONSUM...|FOX & FARMER LEGA...|       2|      2|
|BUSINESS & CONSUM...|CANADY & SON EXTE...|       1|       | 
|DIRECT RESPONSE P...|MYPILLOW PREMIUM ...|       2|      6|
|DIRECT RESPONSE P...|DERMASUCTION DIR ...|       1|       |
|DIRECT RESPONSE P...|GREASE POLICE DIR...|       1|       |
|XXXX.               |GREASE POLICE DIR...|       1|       |.  
+--------------------+--------------------+--------+----------+

最终结果:如果您看到它只有 3 个唯一的“名称”。

1)自动.,AUTO.ACCESS

2)商业与消费

3) 直接响应 P

    +--------------------+--------------------+--------+----------+
    |            Name.   |               Type |cs_Units|cs1_Units |
    +--------------------+--------------------+--------+----------+
    |AUTO.,AUTO.ACCESS...|BACHMAN-BERNARD C...|       4|      8|
    |AUTO.,AUTO.ACCESS...|CAVENAUGHS BRUCE ...|       1|       |
    |AUTO.,AUTO.ACCESS...|SCOTT CHANTZ KIA ...|       2|       |
    |BUSINESS & CONSUM...|WILLIAMS JIM & AS...|      11|      8|
    |BUSINESS & CONSUM...|OBRIEN SVC CO HEA...|       6|      9|
    |BUSINESS & CONSUM...|TOUCHSTONE ENERGY...|       5|      5|
    |BUSINESS & CONSUM...|FOX & FARMER LEGA...|       2|      2|
    |BUSINESS & CONSUM...|CANADY & SON EXTE...|       1|       | 
    |DIRECT RESPONSE P...|MYPILLOW PREMIUM ...|       2|      6|
    |DIRECT RESPONSE P...|DERMASUCTION DIR ...|       1|       |
    |DIRECT RESPONSE P...|GREASE POLICE DIR...|       1|       |
    +--------------------+--------------------+--------+----------+

这是一个代码示例,但我认为您可以限制 distinct(dropDuplicates) 并将内容重新加入。

val df = sc.parallelize(Seq(
  (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
  (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
  (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
  (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")

val distinkt = df.select( df("Category") ).dropDuplicates.limit(3);
df.join( distinkt, distinkt("Category") === df("Category") ).show

如果您对数据了解更多,您可能会想出一个策略来重新分区数据并使用 foreachPartition。但是你必须有一些下一级逻辑才能知道哪个分区是 printed/skipped。这是可行的,但我不确定您会获得什么样的性能提升。