如何从迭代器创建 Spark RDD?
How to create Spark RDD from an iterator?
为了清楚起见,我不是从 array/list 之类的
寻找 RDD
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);
如何从 java 迭代器创建 spark RDD 而无需 完全缓冲 它在内存中?
Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question
附加问题:
是否要求源可重读(或能够多次读取)以提供 RDD 的弹性?换句话说,由于迭代器基本上是一次性读取的,是否有可能从迭代器创建弹性分布式数据集 (RDD)?
正如其他人所说,您可以使用 spark streaming 做一些事情,但是对于纯 spark,您不能,原因是您的要求与 spark 的模型背道而驰。让我解释。
为了分发和并行化工作,spark 必须将其分成块。从 HDFS 读取时,'chunking' 由 HDFS 为 Spark 完成,因为 HDFS 文件是按块组织的。 Spark 通常会为每个块生成一个任务。
现在,迭代器只提供对数据的顺序访问,所以 spark 不可能将它组织成块 而不是在内存中全部读取它 .
也许可以构建一个具有单个可迭代分区的 RDD,但即便如此,也无法确定是否可以将 Iterable 的实现发送给 worker。使用 sc.parallelize() 时,spark 创建实现 serializable
的分区,因此每个分区都可以发送给不同的工作人员。可迭代对象可以通过网络连接,或本地 FS 中的文件,因此除非它们在内存中缓冲,否则它们不能发送给工作人员。
超级老问题,但我会在序列化后在 flatMap 中创建迭代器。
var ranges = Arrays.asList(Pair.of(1,7), Pair.of(0,5));
JavaRDD<Integer> data = sparkContext.parallelize(ranges).flatMap(pair -> Flux.range(pair.left(), pair.right()).toStream().iterator());
为了清楚起见,我不是从 array/list 之类的
寻找 RDDList<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);
如何从 java 迭代器创建 spark RDD 而无需 完全缓冲 它在内存中?
Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question
附加问题:
是否要求源可重读(或能够多次读取)以提供 RDD 的弹性?换句话说,由于迭代器基本上是一次性读取的,是否有可能从迭代器创建弹性分布式数据集 (RDD)?
正如其他人所说,您可以使用 spark streaming 做一些事情,但是对于纯 spark,您不能,原因是您的要求与 spark 的模型背道而驰。让我解释。 为了分发和并行化工作,spark 必须将其分成块。从 HDFS 读取时,'chunking' 由 HDFS 为 Spark 完成,因为 HDFS 文件是按块组织的。 Spark 通常会为每个块生成一个任务。 现在,迭代器只提供对数据的顺序访问,所以 spark 不可能将它组织成块 而不是在内存中全部读取它 .
也许可以构建一个具有单个可迭代分区的 RDD,但即便如此,也无法确定是否可以将 Iterable 的实现发送给 worker。使用 sc.parallelize() 时,spark 创建实现 serializable
的分区,因此每个分区都可以发送给不同的工作人员。可迭代对象可以通过网络连接,或本地 FS 中的文件,因此除非它们在内存中缓冲,否则它们不能发送给工作人员。
超级老问题,但我会在序列化后在 flatMap 中创建迭代器。
var ranges = Arrays.asList(Pair.of(1,7), Pair.of(0,5));
JavaRDD<Integer> data = sparkContext.parallelize(ranges).flatMap(pair -> Flux.range(pair.left(), pair.right()).toStream().iterator());