使用 spark sql 从前 3 个分区获取数据
Get data from first 3 partitions using spark sql
我有以下 table 并且想使用 scala spark
获得前 n 个唯一名称以及其余列
+--------------------+--------------------+--------+----------+
| Name. | Type |cs_Units|cs1_Units |
+--------------------+--------------------+--------+----------+
|AUTO.,AUTO.ACCESS...|BACHMAN-BERNARD C...| 4| 8|
|AUTO.,AUTO.ACCESS...|CAVENAUGHS BRUCE ...| 1| |
|AUTO.,AUTO.ACCESS...|SCOTT CHANTZ KIA ...| 2| |
|BUSINESS & CONSUM...|WILLIAMS JIM & AS...| 11| 8|
|BUSINESS & CONSUM...|OBRIEN SVC CO HEA...| 6| 9|
|BUSINESS & CONSUM...|TOUCHSTONE ENERGY...| 5| 5|
|BUSINESS & CONSUM...|FOX & FARMER LEGA...| 2| 2|
|BUSINESS & CONSUM...|CANADY & SON EXTE...| 1| |
|DIRECT RESPONSE P...|MYPILLOW PREMIUM ...| 2| 6|
|DIRECT RESPONSE P...|DERMASUCTION DIR ...| 1| |
|DIRECT RESPONSE P...|GREASE POLICE DIR...| 1| |
|XXXX. |GREASE POLICE DIR...| 1| |.
+--------------------+--------------------+--------+----------+
最终结果:如果您看到它只有 3 个唯一的“名称”。
1)自动.,AUTO.ACCESS
2)商业与消费
3) 直接响应 P
+--------------------+--------------------+--------+----------+
| Name. | Type |cs_Units|cs1_Units |
+--------------------+--------------------+--------+----------+
|AUTO.,AUTO.ACCESS...|BACHMAN-BERNARD C...| 4| 8|
|AUTO.,AUTO.ACCESS...|CAVENAUGHS BRUCE ...| 1| |
|AUTO.,AUTO.ACCESS...|SCOTT CHANTZ KIA ...| 2| |
|BUSINESS & CONSUM...|WILLIAMS JIM & AS...| 11| 8|
|BUSINESS & CONSUM...|OBRIEN SVC CO HEA...| 6| 9|
|BUSINESS & CONSUM...|TOUCHSTONE ENERGY...| 5| 5|
|BUSINESS & CONSUM...|FOX & FARMER LEGA...| 2| 2|
|BUSINESS & CONSUM...|CANADY & SON EXTE...| 1| |
|DIRECT RESPONSE P...|MYPILLOW PREMIUM ...| 2| 6|
|DIRECT RESPONSE P...|DERMASUCTION DIR ...| 1| |
|DIRECT RESPONSE P...|GREASE POLICE DIR...| 1| |
+--------------------+--------------------+--------+----------+
这是一个代码示例,但我认为您可以限制 distinct(dropDuplicates) 并将内容重新加入。
val df = sc.parallelize(Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")
val distinkt = df.select( df("Category") ).dropDuplicates.limit(3);
df.join( distinkt, distinkt("Category") === df("Category") ).show
如果您对数据了解更多,您可能会想出一个策略来重新分区数据并使用 foreachPartition。但是你必须有一些下一级逻辑才能知道哪个分区是 printed/skipped。这是可行的,但我不确定您会获得什么样的性能提升。
我有以下 table 并且想使用 scala spark
获得前 n 个唯一名称以及其余列+--------------------+--------------------+--------+----------+
| Name. | Type |cs_Units|cs1_Units |
+--------------------+--------------------+--------+----------+
|AUTO.,AUTO.ACCESS...|BACHMAN-BERNARD C...| 4| 8|
|AUTO.,AUTO.ACCESS...|CAVENAUGHS BRUCE ...| 1| |
|AUTO.,AUTO.ACCESS...|SCOTT CHANTZ KIA ...| 2| |
|BUSINESS & CONSUM...|WILLIAMS JIM & AS...| 11| 8|
|BUSINESS & CONSUM...|OBRIEN SVC CO HEA...| 6| 9|
|BUSINESS & CONSUM...|TOUCHSTONE ENERGY...| 5| 5|
|BUSINESS & CONSUM...|FOX & FARMER LEGA...| 2| 2|
|BUSINESS & CONSUM...|CANADY & SON EXTE...| 1| |
|DIRECT RESPONSE P...|MYPILLOW PREMIUM ...| 2| 6|
|DIRECT RESPONSE P...|DERMASUCTION DIR ...| 1| |
|DIRECT RESPONSE P...|GREASE POLICE DIR...| 1| |
|XXXX. |GREASE POLICE DIR...| 1| |.
+--------------------+--------------------+--------+----------+
最终结果:如果您看到它只有 3 个唯一的“名称”。
1)自动.,AUTO.ACCESS
2)商业与消费
3) 直接响应 P
+--------------------+--------------------+--------+----------+
| Name. | Type |cs_Units|cs1_Units |
+--------------------+--------------------+--------+----------+
|AUTO.,AUTO.ACCESS...|BACHMAN-BERNARD C...| 4| 8|
|AUTO.,AUTO.ACCESS...|CAVENAUGHS BRUCE ...| 1| |
|AUTO.,AUTO.ACCESS...|SCOTT CHANTZ KIA ...| 2| |
|BUSINESS & CONSUM...|WILLIAMS JIM & AS...| 11| 8|
|BUSINESS & CONSUM...|OBRIEN SVC CO HEA...| 6| 9|
|BUSINESS & CONSUM...|TOUCHSTONE ENERGY...| 5| 5|
|BUSINESS & CONSUM...|FOX & FARMER LEGA...| 2| 2|
|BUSINESS & CONSUM...|CANADY & SON EXTE...| 1| |
|DIRECT RESPONSE P...|MYPILLOW PREMIUM ...| 2| 6|
|DIRECT RESPONSE P...|DERMASUCTION DIR ...| 1| |
|DIRECT RESPONSE P...|GREASE POLICE DIR...| 1| |
+--------------------+--------------------+--------+----------+
这是一个代码示例,但我认为您可以限制 distinct(dropDuplicates) 并将内容重新加入。
val df = sc.parallelize(Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")
val distinkt = df.select( df("Category") ).dropDuplicates.limit(3);
df.join( distinkt, distinkt("Category") === df("Category") ).show
如果您对数据了解更多,您可能会想出一个策略来重新分区数据并使用 foreachPartition。但是你必须有一些下一级逻辑才能知道哪个分区是 printed/skipped。这是可行的,但我不确定您会获得什么样的性能提升。