Spark RAPIDS - 操作未替换为 GPU 版本
Spark RAPIDS - Operation not replaced with GPU version
我是 Rapids 的新手,我无法理解支持的操作。
我有以下格式的数据:
+------------+----------+
| kmer|source_seq|
+------------+----------+
|TGTCGGTTTAA$| 4|
|ACCACCACCAC$| 8|
|GCATAATTTCC$| 1|
|CCGTCAAAGCG$| 7|
|CCGTCCCGTGG$| 6|
|GCGCTGTTATG$| 2|
|GAGCATAGGTG$| 5|
|CGGCGGATTCT$| 0|
|GGCGCGAGGGT$| 3|
|CCACCACCAC$A| 8|
|CACCACCAC$AA| 8|
|CCCAAAAAAAAA| 0|
|AAGAAAAAAAAA| 5|
|AAGAAAAAAAAA| 0|
|TGTAAAAAAAAA| 0|
|CCACAAAAAAAA| 8|
|AGACAAAAAAAA| 7|
|CCCCAAAAAAAA| 0|
|CAAGAAAAAAAA| 5|
|TAAGAAAAAAAA| 0|
+------------+----------+
我正在尝试使用以下代码找出哪些“kmer”具有哪些“source_seq”:
val w = Window.partitionBy("kmer")
x.withColumn("source_seqs", collect_list("source_seq").over(w))
// Result is something like this:
+------------+----------+-----------+
| kmer|source_seq|source_seqs|
+------------+----------+-----------+
|AAAACAAGACCA| 2| [2]|
|AAAACAAGCAGC| 4| [4]|
|AAAACCACGAGC| 3| [3]|
|AAAACCGCCAAA| 7| [7]|
|AAAACCGGTGTG| 1| [1]|
|AAAACCTATATC| 5| [5]|
|AAAACGACTTCT| 6| [6]|
|AAAACGCGCAAG| 3| [3]|
|AAAAGGCCTATT| 7| [7]|
|AAAAGGCGTTCG| 3| [3]|
|AAAAGGCTGTGA| 1| [1]|
|AAAAGGTCTACC| 2| [2]|
|AAAAGTCGAGCA| 7| [7, 0]|
|AAAAGTCGAGCA| 0| [7, 0]|
|AAAATCCGATCA| 0| [0]|
|AAAATCGAGCGG| 0| [0]|
|AAAATCGTTGAA| 7| [7]|
|AAAATGGACAAG| 1| [1]|
|AAAATTGCACCA| 3| [3]|
|AAACACCGCCGT| 3| [3]|
+------------+----------+-----------+
The Spark Rapids supported operators documentation 提到 collect_list
仅由窗口支持,据我所知,这就是我在代码中所做的。
但是,查看查询计划,很容易看出 collect_list
没有被 GPU 执行:
scala> x.withColumn("source_seqs", collect_list("source_seq").over(w)).explain
== Physical Plan ==
Window [collect_list(source_seq#302L, 0, 0) windowspecdefinition(kmer#301, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS max_source#658], [kmer#301]
+- GpuColumnarToRow false
+- GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
+- GpuCoalesceBatches RequireSingleBatch
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1496]
+- GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>
与具有不同功能的类似查询不同,在这里我们可以看到使用 GPU 执行的窗口:
scala> x.withColumn("min_source", min("source_seq").over(w)).explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuWindow [gpumin(source_seq#302L) gpuwindowspecdefinition(kmer#301, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS max_source#648L], [kmer#301], false
+- GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
+- GpuCoalesceBatches RequireSingleBatch
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1431]
+- GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>
我是不是对支持的操作文档理解有误,还是我以错误的方式编写了代码?如有任何帮助,我们将不胜感激。
肯尼。请问您使用的是什么版本的 rapids-4-spark
插件,以及 Spark 的版本?
COLLECT_LIST()
的初始 GPU 实现默认被禁用,因为它的行为与 Spark 的 w.r.t 空值不匹配。 (GPU 版本在聚合数组行中保留空值,而 Spark 删除它们。)编辑:该行为在 0.5 版本中得到纠正。
如果您的聚合列中没有空值(并且正在使用 rapids-4-spark
0.4),您可以尝试通过设置 spark.rapids.sql.expression.CollectList=true
.
来启用运算符
一般情况下,可以通过设置spark.rapids.sql.explain=NOT_ON_GPU
来查看算子在GPU上没有运行的原因。那应该将原因打印到控制台。
如果您在使用 rapids-4-spark
插件时仍然遇到困难或不正确的行为,请随时在 the project's GitHub 上提出错误。我们很乐意进一步调查。
是的,正如 Mithun 提到的,spark.rapids.sql.expression.CollectList 从 0.5 版本开始是正确的。
然而在 0.4 版本中它是错误的:
https://github.com/NVIDIA/spark-rapids/blob/branch-0.4/docs/configs.md
这是我在 0.5+ 版本上测试的计划:
val w = Window.partitionBy("name")
val resultdf=dfread.withColumn("values", collect_list("value").over(w))
resultdf.explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuWindow [collect_list(value#134L, 0, 0) gpuwindowspecdefinition(name#133, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS values#138], [name#133], false
+- GpuCoalesceBatches RequireSingleBatch
+- GpuSort [name#133 ASC NULLS FIRST], false, com.nvidia.spark.rapids.OutOfCoreSort$@28e73bd1
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpuhashpartitioning(name#133, 200), ENSURE_REQUIREMENTS, [id=#563]
+- GpuFileGpuScan csv [name#133,value#134L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/tmp/df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,value:bigint>
collect_set 聚合和窗口将在即将发布的 21.08 版本中得到支持(RAPIDS Spark 正在转向日历版本控制)。
我是 Rapids 的新手,我无法理解支持的操作。
我有以下格式的数据:
+------------+----------+
| kmer|source_seq|
+------------+----------+
|TGTCGGTTTAA$| 4|
|ACCACCACCAC$| 8|
|GCATAATTTCC$| 1|
|CCGTCAAAGCG$| 7|
|CCGTCCCGTGG$| 6|
|GCGCTGTTATG$| 2|
|GAGCATAGGTG$| 5|
|CGGCGGATTCT$| 0|
|GGCGCGAGGGT$| 3|
|CCACCACCAC$A| 8|
|CACCACCAC$AA| 8|
|CCCAAAAAAAAA| 0|
|AAGAAAAAAAAA| 5|
|AAGAAAAAAAAA| 0|
|TGTAAAAAAAAA| 0|
|CCACAAAAAAAA| 8|
|AGACAAAAAAAA| 7|
|CCCCAAAAAAAA| 0|
|CAAGAAAAAAAA| 5|
|TAAGAAAAAAAA| 0|
+------------+----------+
我正在尝试使用以下代码找出哪些“kmer”具有哪些“source_seq”:
val w = Window.partitionBy("kmer")
x.withColumn("source_seqs", collect_list("source_seq").over(w))
// Result is something like this:
+------------+----------+-----------+
| kmer|source_seq|source_seqs|
+------------+----------+-----------+
|AAAACAAGACCA| 2| [2]|
|AAAACAAGCAGC| 4| [4]|
|AAAACCACGAGC| 3| [3]|
|AAAACCGCCAAA| 7| [7]|
|AAAACCGGTGTG| 1| [1]|
|AAAACCTATATC| 5| [5]|
|AAAACGACTTCT| 6| [6]|
|AAAACGCGCAAG| 3| [3]|
|AAAAGGCCTATT| 7| [7]|
|AAAAGGCGTTCG| 3| [3]|
|AAAAGGCTGTGA| 1| [1]|
|AAAAGGTCTACC| 2| [2]|
|AAAAGTCGAGCA| 7| [7, 0]|
|AAAAGTCGAGCA| 0| [7, 0]|
|AAAATCCGATCA| 0| [0]|
|AAAATCGAGCGG| 0| [0]|
|AAAATCGTTGAA| 7| [7]|
|AAAATGGACAAG| 1| [1]|
|AAAATTGCACCA| 3| [3]|
|AAACACCGCCGT| 3| [3]|
+------------+----------+-----------+
The Spark Rapids supported operators documentation 提到 collect_list
仅由窗口支持,据我所知,这就是我在代码中所做的。
但是,查看查询计划,很容易看出 collect_list
没有被 GPU 执行:
scala> x.withColumn("source_seqs", collect_list("source_seq").over(w)).explain
== Physical Plan ==
Window [collect_list(source_seq#302L, 0, 0) windowspecdefinition(kmer#301, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS max_source#658], [kmer#301]
+- GpuColumnarToRow false
+- GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
+- GpuCoalesceBatches RequireSingleBatch
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1496]
+- GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>
与具有不同功能的类似查询不同,在这里我们可以看到使用 GPU 执行的窗口:
scala> x.withColumn("min_source", min("source_seq").over(w)).explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuWindow [gpumin(source_seq#302L) gpuwindowspecdefinition(kmer#301, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS max_source#648L], [kmer#301], false
+- GpuSort [kmer#301 ASC NULLS FIRST], false, RequireSingleBatch, 0
+- GpuCoalesceBatches RequireSingleBatch
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpuhashpartitioning(kmer#301, 200), ENSURE_REQUIREMENTS, [id=#1431]
+- GpuFileGpuScan csv [kmer#301,source_seq#302L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/cloud-user/phase1/example/1620833755/part-00000], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<kmer:string,source_seq:bigint>
我是不是对支持的操作文档理解有误,还是我以错误的方式编写了代码?如有任何帮助,我们将不胜感激。
肯尼。请问您使用的是什么版本的 rapids-4-spark
插件,以及 Spark 的版本?
COLLECT_LIST()
的初始 GPU 实现默认被禁用,因为它的行为与 Spark 的 w.r.t 空值不匹配。 (GPU 版本在聚合数组行中保留空值,而 Spark 删除它们。)编辑:该行为在 0.5 版本中得到纠正。
如果您的聚合列中没有空值(并且正在使用 rapids-4-spark
0.4),您可以尝试通过设置 spark.rapids.sql.expression.CollectList=true
.
一般情况下,可以通过设置spark.rapids.sql.explain=NOT_ON_GPU
来查看算子在GPU上没有运行的原因。那应该将原因打印到控制台。
如果您在使用 rapids-4-spark
插件时仍然遇到困难或不正确的行为,请随时在 the project's GitHub 上提出错误。我们很乐意进一步调查。
是的,正如 Mithun 提到的,spark.rapids.sql.expression.CollectList 从 0.5 版本开始是正确的。 然而在 0.4 版本中它是错误的: https://github.com/NVIDIA/spark-rapids/blob/branch-0.4/docs/configs.md
这是我在 0.5+ 版本上测试的计划:
val w = Window.partitionBy("name")
val resultdf=dfread.withColumn("values", collect_list("value").over(w))
resultdf.explain
== Physical Plan ==
GpuColumnarToRow false
+- GpuWindow [collect_list(value#134L, 0, 0) gpuwindowspecdefinition(name#133, gpuspecifiedwindowframe(RowFrame, gpuspecialframeboundary(unboundedpreceding$()), gpuspecialframeboundary(unboundedfollowing$()))) AS values#138], [name#133], false
+- GpuCoalesceBatches RequireSingleBatch
+- GpuSort [name#133 ASC NULLS FIRST], false, com.nvidia.spark.rapids.OutOfCoreSort$@28e73bd1
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpuhashpartitioning(name#133, 200), ENSURE_REQUIREMENTS, [id=#563]
+- GpuFileGpuScan csv [name#133,value#134L] Batched: true, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/tmp/df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,value:bigint>
collect_set 聚合和窗口将在即将发布的 21.08 版本中得到支持(RAPIDS Spark 正在转向日历版本控制)。