PySpark 重新分区 RDD 元素
PySpark repartitioning RDD elements
我有一个 spark 作业,它从 Kafka 流中读取数据并为流中的每个 RDD 执行一个操作。如果RDD不为空,我想把RDD保存到HDFS,但是我想为RDD中的每个元素创建一个文件。我找到了
RDD.saveAsTextFile(file_location)
将为每个分区创建一个文件,因此我试图更改 RDD,使每个分区仅包含一个元素。这是我正在尝试做的一个例子
data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0'])
data.glom().collect() #Produces [['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0']]
data.saveAsTextFile(file_location) #Produces 2 files
我可以更接近我想要的,但我找不到一种方法来确保每个分区只有一个元素
data1 = data.coalesce(1, True).repartition(data.count())
data1.glom().collect() #Produces [[], ['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0'], [], [], [], [], [], [], []]
data2 = data.map(lambda t : t).coalesce(1, True).repartition(data.count())
data2.glom().collect() #Produces [[], ['1'], ['2', '3'], ['4', '5'], ['6'], ['7', '8'], ['9', '0'], [], [], []]
data2.saveAsTextFile(file_location) #Produces 10 files, but some are empty
我知道在这个例子中我可以将我想要的分区传递给 sc.parallelize() 但是当我从 kafka 流中读取时这是不可能的。关于如何按照我想要的方式重新分区,或者如何更好地解决这个问题,有什么建议吗?
python 分区器 API 在下面使用哈希分区器,这就是为什么即使您有 K 个桶,您仍然可以获得一些 "collisions"。如果您可以在 Scala 中执行此操作,则可以提供一个自定义分区程序(基于范围 + 桶数 == num elems 可能会成功)。但是每个分区都有一些开销(重新分区是一项昂贵的操作),执行保存逻辑而不是 foreach 而不是重新分区可能更合理。
好吧,这里有一个 python 自定义分区的解决方案。
(明确一点,将每个元素放在单独的文件中可能不是最好的设计)。
data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0']).map(lambda x: (x,x))
print data.collect()
c = data.count()
wp = data.partitionBy(c,lambda k: int(k))
print wp.map(lambda t: t[0]).glom().collect()
sc.stop()
结果:
[('1', '1'), ('2', '2'), ('3', '3'), ('4', '4'), ('5', '5'), ('6', '6'), ('7', '7'), ('8', '8'), ('9', '9'), ('0', '0')]
[['0'], ['1'], ['2'], ['3'], ['4'], ['5'], ['6'], ['7'], ['8'], ['9']]
希望对您有所帮助。
我有一个 spark 作业,它从 Kafka 流中读取数据并为流中的每个 RDD 执行一个操作。如果RDD不为空,我想把RDD保存到HDFS,但是我想为RDD中的每个元素创建一个文件。我找到了
RDD.saveAsTextFile(file_location)
将为每个分区创建一个文件,因此我试图更改 RDD,使每个分区仅包含一个元素。这是我正在尝试做的一个例子
data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0'])
data.glom().collect() #Produces [['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0']]
data.saveAsTextFile(file_location) #Produces 2 files
我可以更接近我想要的,但我找不到一种方法来确保每个分区只有一个元素
data1 = data.coalesce(1, True).repartition(data.count())
data1.glom().collect() #Produces [[], ['1', '2', '3', '4', '5'], ['6', '7', '8', '9', '0'], [], [], [], [], [], [], []]
data2 = data.map(lambda t : t).coalesce(1, True).repartition(data.count())
data2.glom().collect() #Produces [[], ['1'], ['2', '3'], ['4', '5'], ['6'], ['7', '8'], ['9', '0'], [], [], []]
data2.saveAsTextFile(file_location) #Produces 10 files, but some are empty
我知道在这个例子中我可以将我想要的分区传递给 sc.parallelize() 但是当我从 kafka 流中读取时这是不可能的。关于如何按照我想要的方式重新分区,或者如何更好地解决这个问题,有什么建议吗?
python 分区器 API 在下面使用哈希分区器,这就是为什么即使您有 K 个桶,您仍然可以获得一些 "collisions"。如果您可以在 Scala 中执行此操作,则可以提供一个自定义分区程序(基于范围 + 桶数 == num elems 可能会成功)。但是每个分区都有一些开销(重新分区是一项昂贵的操作),执行保存逻辑而不是 foreach 而不是重新分区可能更合理。
好吧,这里有一个 python 自定义分区的解决方案。
(明确一点,将每个元素放在单独的文件中可能不是最好的设计)。
data = sc.parallelize(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0']).map(lambda x: (x,x))
print data.collect()
c = data.count()
wp = data.partitionBy(c,lambda k: int(k))
print wp.map(lambda t: t[0]).glom().collect()
sc.stop()
结果:
[('1', '1'), ('2', '2'), ('3', '3'), ('4', '4'), ('5', '5'), ('6', '6'), ('7', '7'), ('8', '8'), ('9', '9'), ('0', '0')]
[['0'], ['1'], ['2'], ['3'], ['4'], ['5'], ['6'], ['7'], ['8'], ['9']]
希望对您有所帮助。