如何使用 Spark 创建 MapFile 并访问它?
How to creating a MapFile with Spark and access it?
我正在尝试从 Spark RDD 创建 MapFile,但找不到足够的信息。到目前为止,这是我的步骤:
我开始于,
rdd.saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
引发异常,因为必须对 MapFiles
进行排序。
所以我修改为:
rdd.sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
效果很好,我的 MapFile 已创建。所以下一步是访问文件。使用创建 parts
的目录名称失败,表示找不到 data
文件。回到 Google,我发现为了访问我需要使用的 MapFile
部分:
Object ret = new Object();//My actual WritableComparable impl
Reader[] readers = MapFileOutputFormat.getReaders(new Path(file), new Configuration());
Partitioner<K,V> p = new HashPartitioner<>();
Writable e = MapFileOutputFormat.getEntry(readers, p key, ret);
天真地,我忽略了 HashPartioner
位并期望这会找到我的条目,但没有运气。所以我的下一步是遍历 reader 并执行 get(..)
。该解决方案确实有效,但速度极慢,因为文件是由 128 个任务创建的,导致 128 个 part
个文件。
所以我调查了 HashPartitioner
的重要性,发现它在内部使用它来识别要使用哪个 reader,但似乎 Spark 没有使用相同的分区逻辑。所以我修改为:
rdd.partitionBy(new org.apache.spark.HashPartitioner(128)).sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
但是 2 HashPartioner
还是不匹配。所以问题部分...
- 有没有办法有效地组合
MapFiles
(因为这会忽略分区逻辑)?
MapFileOutputFormat.getReaders(new Path(file), new
Configuration());
非常慢。我可以识别更多 reader
高效?
- 我正在使用 MapR-FS 作为底层 DFS。这会使用相同的
HashParitioner
实现吗?
- 有没有办法避免重新分区,或者应该对整个文件的数据进行排序? (与在分区内排序相反)
- 我也遇到异常
_SUCCESS/data does not exist
。我需要手动删除这个文件吗?
任何关于此的链接将不胜感激。
PS。如果条目已排序,那么如何使用 HashPartitioner
找到正确的 Reader
?这意味着数据 parts
是 Hash Partitioned
然后按键排序。所以我也尝试了 rdd.repartiotionAndSortWithinPartitions(new HashPartitioner(280))
,但还是没有成功。
深入研究发现Spark HashPartitioner和Hadoop HashPartitioner的逻辑不一样。
所以我尝试并有效的 "brute force" 解决方案如下。
使用 rdd.repartitionAndSortWithinPArtitions(new
org.apache.aprk.HashPartitioner(num_of_parititions)).saveAsNewAPIHadoopFile(....MapFileOutputFormat.class);
保存 MapFile
查找使用:
- Reader[] 读者 = MapFileOutputFormat.getReaders(新路径(文件),新配置());
- org.apache.aprk.HashPartitioner p = 新 org.apache.aprk.HashPartitioner(readers.length);
- 读者[p.getPartition(key)].get(key,val);
这是 "dirty",因为 MapFile 访问现在绑定到 Spark 分区程序,而不是直观的 Hadoop HashPartitioner。不过,我可以实现一个使用 Hadoop HashPartitioner
进行改进的 Spark 分区程序。
这也没有解决访问相对较多的减速器速度慢的问题。通过从分区程序生成文件部件号,我什至可以做到这一点 'dirtier',但我正在寻找一个干净的解决方案,所以如果有更好的方法解决这个问题,请post。
我正在尝试从 Spark RDD 创建 MapFile,但找不到足够的信息。到目前为止,这是我的步骤:
我开始于,
rdd.saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
引发异常,因为必须对 MapFiles
进行排序。
所以我修改为:
rdd.sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
效果很好,我的 MapFile 已创建。所以下一步是访问文件。使用创建 parts
的目录名称失败,表示找不到 data
文件。回到 Google,我发现为了访问我需要使用的 MapFile
部分:
Object ret = new Object();//My actual WritableComparable impl
Reader[] readers = MapFileOutputFormat.getReaders(new Path(file), new Configuration());
Partitioner<K,V> p = new HashPartitioner<>();
Writable e = MapFileOutputFormat.getEntry(readers, p key, ret);
天真地,我忽略了 HashPartioner
位并期望这会找到我的条目,但没有运气。所以我的下一步是遍历 reader 并执行 get(..)
。该解决方案确实有效,但速度极慢,因为文件是由 128 个任务创建的,导致 128 个 part
个文件。
所以我调查了 HashPartitioner
的重要性,发现它在内部使用它来识别要使用哪个 reader,但似乎 Spark 没有使用相同的分区逻辑。所以我修改为:
rdd.partitionBy(new org.apache.spark.HashPartitioner(128)).sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
但是 2 HashPartioner
还是不匹配。所以问题部分...
- 有没有办法有效地组合
MapFiles
(因为这会忽略分区逻辑)? MapFileOutputFormat.getReaders(new Path(file), new Configuration());
非常慢。我可以识别更多 reader 高效?- 我正在使用 MapR-FS 作为底层 DFS。这会使用相同的
HashParitioner
实现吗? - 有没有办法避免重新分区,或者应该对整个文件的数据进行排序? (与在分区内排序相反)
- 我也遇到异常
_SUCCESS/data does not exist
。我需要手动删除这个文件吗?
任何关于此的链接将不胜感激。
PS。如果条目已排序,那么如何使用 HashPartitioner
找到正确的 Reader
?这意味着数据 parts
是 Hash Partitioned
然后按键排序。所以我也尝试了 rdd.repartiotionAndSortWithinPartitions(new HashPartitioner(280))
,但还是没有成功。
深入研究发现Spark HashPartitioner和Hadoop HashPartitioner的逻辑不一样。
所以我尝试并有效的 "brute force" 解决方案如下。
使用 rdd.repartitionAndSortWithinPArtitions(new
org.apache.aprk.HashPartitioner(num_of_parititions)).saveAsNewAPIHadoopFile(....MapFileOutputFormat.class);
查找使用:
- Reader[] 读者 = MapFileOutputFormat.getReaders(新路径(文件),新配置());
- org.apache.aprk.HashPartitioner p = 新 org.apache.aprk.HashPartitioner(readers.length);
- 读者[p.getPartition(key)].get(key,val);
这是 "dirty",因为 MapFile 访问现在绑定到 Spark 分区程序,而不是直观的 Hadoop HashPartitioner。不过,我可以实现一个使用 Hadoop HashPartitioner
进行改进的 Spark 分区程序。
这也没有解决访问相对较多的减速器速度慢的问题。通过从分区程序生成文件部件号,我什至可以做到这一点 'dirtier',但我正在寻找一个干净的解决方案,所以如果有更好的方法解决这个问题,请post。