有条件地 Combining/Reducing 密钥对
Conditionally Combining/Reducing key-pairs
我遇到这个问题已经有一段时间了,我认为这与我对如何使用 combineByKey 和 reduceByKey 缺乏了解有关,所以希望有人能解决这个问题。
我正在处理 DNA 序列,所以我有一个程序来生成它的一堆不同版本(向前、向后和称赞)。我有几个阅读框架,这意味着对于字符串 ABCABC
,我想要以下一系列键:ABC ABC
、A BCA BC
、AB CAB C
.
现在我正在使用以下函数来分解(我在 flatMap 过程中运行):
# Modified from
def chunkCodons((seq, strand, reading_frame)):
"""
Yield successive codons from seq
"""
# Get the first characters
if reading_frame > 0:
yield (0, seq[0:reading_frame], strand, reading_frame)
for i in xrange(reading_frame, len(seq), 3):
if i % 1000000 == 0:
print "Base # = {:,}".format(i)
yield (i, seq[i:i + 3], strand, reading_frame)
我运行这样的:reading_frames_rdd = nascent_reading_frames_rdd.flatMap(chunkCodons)
然而,这在一长串 DNA 上需要很长时间,所以我知道这一定是错的。
因此,我想让 Spark 以更直接的方式执行此操作,方法是按字符(即基础)分解它,然后一次重新组合 3 个。问题是我必须组合不相同但 相邻 的键。这意味着如果我有 (1, 'A'), (2, 'B'), (3, 'C'),....
,我希望能够生成 (1, 'ABC').
我不知道该怎么做。我怀疑我需要使用 combineByKey 并让它只有条件地产生输出。如果它满足我的条件,我是否只是让它只产生可以被 combineByKey 使用的输出?我应该这样做吗?
编辑:
这是我的输入:[(0, 'A'), (1, 'A'), (2, 'B'), (3, 'A'), (4, 'C'), ....]
我想要这样的输出:[(0, 0, 'AAB'), (0, 1, 'ABX'), ...]
和 [(1, 0, 'A'), (1, 1, 'ABA'), (1, 2, 'CXX')...]
.
格式为[(reading frame, first base #, sequence)]
您可以尝试这样的操作:
seq = sc.parallelize(zip(xrange(16), "ATCGATGCATGCATGC"))
(seq
.flatMap(lambda (pos, x): ((pos - i, (pos, x)) for i in range(3)))
.groupByKey()
.mapValues(lambda x: ''.join(v for (pos, v) in sorted(x)))
.filter(lambda (pos, codon): len(codon) == 3)
.map(lambda (pos, codon): (pos % 3, pos, codon))
.collect())
结果:
[(0, 0, 'ATC'),
(1, 1, 'TCG'),
(2, 2, 'CGA'),
(0, 3, 'GAT'),
(1, 4, 'ATG'),
(2, 5, 'TGC'),
(0, 6, 'GCA'),
(1, 7, 'CAT'),
(2, 8, 'ATG'),
(0, 9, 'TGC'),
(1, 10, 'GCA'),
(2, 11, 'CAT'),
(0, 12, 'ATG'),
(1, 13, 'TGC')]
实际上我会尝试其他方法:
from toolz.itertoolz import sliding_window, iterate, map, zip
from itertools import product
from numpy import uint8
def inc(x):
return x + uint8(1)
# Create dictionary mapping from codon to integer
mapping = dict(zip(product('ATCG', repeat=3), iterate(inc, uint8(0))))
seq = sc.parallelize(["ATCGATGCATGCATGC"])
(seq
# Generate pairs (start-position, 3-gram)
.flatMap(lambda s: zip(iterate(inc, 0), sliding_window(3, s)))
# Map 3-grams to respective integers
.map(lambda (pos, seq): (pos, mapping.get(seq)))
.collect())
阅读框显然是多余的,随时可以从起始位置得到所以这里省略了。
密码子和小整数之间的简单映射可以节省大量内存和流量。
我遇到这个问题已经有一段时间了,我认为这与我对如何使用 combineByKey 和 reduceByKey 缺乏了解有关,所以希望有人能解决这个问题。
我正在处理 DNA 序列,所以我有一个程序来生成它的一堆不同版本(向前、向后和称赞)。我有几个阅读框架,这意味着对于字符串 ABCABC
,我想要以下一系列键:ABC ABC
、A BCA BC
、AB CAB C
.
现在我正在使用以下函数来分解(我在 flatMap 过程中运行):
# Modified from
def chunkCodons((seq, strand, reading_frame)):
"""
Yield successive codons from seq
"""
# Get the first characters
if reading_frame > 0:
yield (0, seq[0:reading_frame], strand, reading_frame)
for i in xrange(reading_frame, len(seq), 3):
if i % 1000000 == 0:
print "Base # = {:,}".format(i)
yield (i, seq[i:i + 3], strand, reading_frame)
我运行这样的:reading_frames_rdd = nascent_reading_frames_rdd.flatMap(chunkCodons)
然而,这在一长串 DNA 上需要很长时间,所以我知道这一定是错的。
因此,我想让 Spark 以更直接的方式执行此操作,方法是按字符(即基础)分解它,然后一次重新组合 3 个。问题是我必须组合不相同但 相邻 的键。这意味着如果我有 (1, 'A'), (2, 'B'), (3, 'C'),....
,我希望能够生成 (1, 'ABC').
我不知道该怎么做。我怀疑我需要使用 combineByKey 并让它只有条件地产生输出。如果它满足我的条件,我是否只是让它只产生可以被 combineByKey 使用的输出?我应该这样做吗?
编辑:
这是我的输入:[(0, 'A'), (1, 'A'), (2, 'B'), (3, 'A'), (4, 'C'), ....]
我想要这样的输出:[(0, 0, 'AAB'), (0, 1, 'ABX'), ...]
和 [(1, 0, 'A'), (1, 1, 'ABA'), (1, 2, 'CXX')...]
.
格式为[(reading frame, first base #, sequence)]
您可以尝试这样的操作:
seq = sc.parallelize(zip(xrange(16), "ATCGATGCATGCATGC"))
(seq
.flatMap(lambda (pos, x): ((pos - i, (pos, x)) for i in range(3)))
.groupByKey()
.mapValues(lambda x: ''.join(v for (pos, v) in sorted(x)))
.filter(lambda (pos, codon): len(codon) == 3)
.map(lambda (pos, codon): (pos % 3, pos, codon))
.collect())
结果:
[(0, 0, 'ATC'),
(1, 1, 'TCG'),
(2, 2, 'CGA'),
(0, 3, 'GAT'),
(1, 4, 'ATG'),
(2, 5, 'TGC'),
(0, 6, 'GCA'),
(1, 7, 'CAT'),
(2, 8, 'ATG'),
(0, 9, 'TGC'),
(1, 10, 'GCA'),
(2, 11, 'CAT'),
(0, 12, 'ATG'),
(1, 13, 'TGC')]
实际上我会尝试其他方法:
from toolz.itertoolz import sliding_window, iterate, map, zip
from itertools import product
from numpy import uint8
def inc(x):
return x + uint8(1)
# Create dictionary mapping from codon to integer
mapping = dict(zip(product('ATCG', repeat=3), iterate(inc, uint8(0))))
seq = sc.parallelize(["ATCGATGCATGCATGC"])
(seq
# Generate pairs (start-position, 3-gram)
.flatMap(lambda s: zip(iterate(inc, 0), sliding_window(3, s)))
# Map 3-grams to respective integers
.map(lambda (pos, seq): (pos, mapping.get(seq)))
.collect())
阅读框显然是多余的,随时可以从起始位置得到所以这里省略了。
密码子和小整数之间的简单映射可以节省大量内存和流量。