反向补充pyspark中的字符串
Reverse complement a string in pyspark
我正在编写一个面向 pyspark 生物的应用程序,在其中一个步骤中,我有一个提取的 dna 序列的 spark 数据框。对于那些出现在负链中的,我想反补。
我能够使用 udf 执行任务,但我知道这会限制 sparks 的效率(特别是因为这是 pyspark)。这也会导致 OOM 问题。
反转字符串很容易,因为它是内置功能,但我找不到补充 dna 碱基的方法(A->T、G->C、N->N,...)。
有什么灵巧的方法 sql 吗?如果不是,是否有助于在 java 中实现它并在 python 中将其注册为 udf?
我 运行 使用 EMR 6.20,所以它基于 spark 3
编辑:
请求的示例数据。假设我有一个包含以下数据的数据框:
+------------+
| sequence|
+------------+
|ATTGCCATGCCA|
|GTTCGTTA |
|ATNNGGRRG |
+------------+
预期的输出应该是:
+------------+
| sequence|
+------------+
|TAACGGTACGGT|
|CAAGCAAT |
|TANNCCYYC |
+------------+
映射基于IUPAC notations for DNA,补码表示DNA配对的互补碱基(A<->T,G<->C)。
编辑(解决方案):
感谢@mck 的解决方案。假定大写序列的反向补码调用版本(否则只需添加小写选项)
from pyspark.sql import functions as F
df2 = df.withColumn(
'stranded_sequence',
F.translate(
F.reverse(F.col('sequence')),
'ACGTRYSWKMBDHVN',
'TGCAYRSWMKVHDBN'
)
)
如果你像我一样在 df 中有一个 strand
列,你甚至可以这样切换大小写:
df2 = df.withColumn(
'stranded_sequence',
F.when(
F.col('strand') == '-',
F.translate(
F.reverse(F.col('sequence')),
'ACGTRYSWKMBDHVN',
'TGCAYRSWMKVHDBN'
)
).otherwise(F.col('sequence'))
)
尝试 translate
:
import pyspark.sql.functions as F
df2 = df.withColumn('sequence', F.translate('sequence', 'ATCGRY', 'TAGCYR'))
df2.show()
+------------+
| sequence|
+------------+
|TAACGGTACGGT|
| CAAGCAAT|
| TANNCCYYC|
+------------+
为了考虑所有可能的基础,您可以将字符串扩展为类似
ATCGRYSWKM...
TAGCYRWSMK...
我正在编写一个面向 pyspark 生物的应用程序,在其中一个步骤中,我有一个提取的 dna 序列的 spark 数据框。对于那些出现在负链中的,我想反补。
我能够使用 udf 执行任务,但我知道这会限制 sparks 的效率(特别是因为这是 pyspark)。这也会导致 OOM 问题。
反转字符串很容易,因为它是内置功能,但我找不到补充 dna 碱基的方法(A->T、G->C、N->N,...)。
有什么灵巧的方法 sql 吗?如果不是,是否有助于在 java 中实现它并在 python 中将其注册为 udf?
我 运行 使用 EMR 6.20,所以它基于 spark 3
编辑: 请求的示例数据。假设我有一个包含以下数据的数据框:
+------------+
| sequence|
+------------+
|ATTGCCATGCCA|
|GTTCGTTA |
|ATNNGGRRG |
+------------+
预期的输出应该是:
+------------+
| sequence|
+------------+
|TAACGGTACGGT|
|CAAGCAAT |
|TANNCCYYC |
+------------+
映射基于IUPAC notations for DNA,补码表示DNA配对的互补碱基(A<->T,G<->C)。
编辑(解决方案): 感谢@mck 的解决方案。假定大写序列的反向补码调用版本(否则只需添加小写选项)
from pyspark.sql import functions as F
df2 = df.withColumn(
'stranded_sequence',
F.translate(
F.reverse(F.col('sequence')),
'ACGTRYSWKMBDHVN',
'TGCAYRSWMKVHDBN'
)
)
如果你像我一样在 df 中有一个 strand
列,你甚至可以这样切换大小写:
df2 = df.withColumn(
'stranded_sequence',
F.when(
F.col('strand') == '-',
F.translate(
F.reverse(F.col('sequence')),
'ACGTRYSWKMBDHVN',
'TGCAYRSWMKVHDBN'
)
).otherwise(F.col('sequence'))
)
尝试 translate
:
import pyspark.sql.functions as F
df2 = df.withColumn('sequence', F.translate('sequence', 'ATCGRY', 'TAGCYR'))
df2.show()
+------------+
| sequence|
+------------+
|TAACGGTACGGT|
| CAAGCAAT|
| TANNCCYYC|
+------------+
为了考虑所有可能的基础,您可以将字符串扩展为类似
ATCGRYSWKM...
TAGCYRWSMK...