如何使用 Spark 创建 MapFile 并访问它?

How to creating a MapFile with Spark and access it?

我正在尝试从 Spark RDD 创建 MapFile,但找不到足够的信息。到目前为止,这是我的步骤:

我开始于,

rdd.saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

引发异常,因为必须对 MapFiles 进行排序。 所以我修改为:

rdd.sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

效果很好,我的 MapFile 已创建。所以下一步是访问文件。使用创建 parts 的目录名称失败,表示找不到 data 文件。回到 Google,我发现为了访问我需要使用的 MapFile 部分:

Object ret = new Object();//My actual WritableComparable impl
Reader[] readers = MapFileOutputFormat.getReaders(new Path(file), new Configuration());
Partitioner<K,V> p = new HashPartitioner<>();
Writable e = MapFileOutputFormat.getEntry(readers, p key, ret);

天真地,我忽略了 HashPartioner 位并期望这会找到我的条目,但没有运气。所以我的下一步是遍历 reader 并执行 get(..)。该解决方案确实有效,但速度极慢,因为文件是由 128 个任务创建的,导致 128 个 part 个文件。

所以我调查了 HashPartitioner 的重要性,发现它在内部使用它来识别要使用哪个 reader,但似乎 Spark 没有使用相同的分区逻辑。所以我修改为:

rdd.partitionBy(new org.apache.spark.HashPartitioner(128)).sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

但是 2 HashPartioner 还是不匹配。所以问题部分...

任何关于此的链接将不胜感激。

PS。如果条目已排序,那么如何使用 HashPartitioner 找到正确的 Reader?这意味着数据 partsHash Partitioned 然后按键排序。所以我也尝试了 rdd.repartiotionAndSortWithinPartitions(new HashPartitioner(280)),但还是没有成功。

深入研究发现Spark HashPartitioner和Hadoop HashPartitioner的逻辑不一样。

所以我尝试并有效的 "brute force" 解决方案如下。

使用 rdd.repartitionAndSortWithinPArtitions(new org.apache.aprk.HashPartitioner(num_of_parititions)).saveAsNewAPIHadoopFile(....MapFileOutputFormat.class);

保存 MapFile

查找使用:

  • Reader[] 读者 = MapFileOutputFormat.getReaders(新路径(文件),新配置());
  • org.apache.aprk.HashPartitioner p = 新 org.apache.aprk.HashPartitioner(readers.length);
  • 读者[p.getPartition(key)].get(key,val);

这是 "dirty",因为 MapFile 访问现在绑定到 Spark 分区程序,而不是直观的 Hadoop HashPartitioner。不过,我可以实现一个使用 Hadoop HashPartitioner 进行改进的 Spark 分区程序。

这也没有解决访问相对较多的减速器速度慢的问题。通过从分区程序生成文件部件号,我什至可以做到这一点 'dirtier',但我正在寻找一个干净的解决方案,所以如果有更好的方法解决这个问题,请post。