为什么使用MapReduce和HDFS时List<String>总是空的?
Why List<String> is always empty when using MapReduce and HDFS?
所以我有一个程序,它使用 Mapper、Combiner 和 Reducer 来获取IMDB 存储库的某些字段和此程序在我 运行 在我的机器上运行时运行良好。
当我使用 Hadoop HDFS 将此代码放入 Docker 中的 运行 时,它没有得到我需要的一些值,或者准确地说,Combiner 将一些值放入 List,即 public class variable, 不起作用什么的因为当我尝试在 Reducer[= 中使用那个 List 38=] 看起来它总是空的。当我在我的机器上 运行ning 时(没有 Docker 和 Hadoop HDFS)它会将值放入List 但是当 运行ning on Docker 时,它看起来总是空的。我还在 main 上打印了 List 的大小,它 returns 0,有什么建议吗?
public class FromParquetToParquetFile{
public static List<String> top10 = new ArrayList<>();
....
}
Combiner 看起来像:
public static class FromParquetQueriesCombiner extends Reducer<Text,Text, Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
long total = 0;
long maior = -1;
String tconst = "";
String title = "";
for (Text value : values) {
total++; //numero de filmes
String[] fields = value.toString().split("\t");
top10.add(key.toString() + "\t" + fields[2] + "\t" + fields[3] + "\t" + fields[0] + "\t" + fields[1]);
int x = Integer.parseInt(fields[3]);
if (x >= maior) {
tconst = fields[0];
title = fields[1];
maior = x;
}
}
StringBuilder result = new StringBuilder();
result.append(total);
result.append("\t");
result.append(tconst);
result.append("\t");
result.append(title);
result.append("\t");
context.write(key, new Text(result.toString()));
}
}
Reducer 看起来像(它有一个排序列表的设置):
public static class FromParquetQueriesReducer extends Reducer<Text, Text, Void, GenericRecord> {
private Schema schema;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Collections.sort(top10, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
String[] aux = o1.split("\t");
String[] aux2 = o2.split("\t");
...
return -result;
}
});
schema = getSchema("hdfs:///schema.alinea2");
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
...
for(String s : top10)
...
}
}
如 所述,“public”变量(在 Java 意义上)不能完全“转化”为旨在在分布式系统中实施的并行计算模型(这就是为什么当您在本地 运行 应用程序没有任何问题时,当您 运行 它沿着 HDFS 运行时,事情“崩溃”了。
Mapper 和 Reducer 实例是隔离的,或多或少“独立”于描述它们的函数“周围”的任何东西。这意味着他们实际上无法访问放在父 class(即这里的 FromParquetToParquetFile
)或程序的 driver/main 函数中的变量。从中我们可以理解(如果你想保留你的工作的当前功能方式)我们需要某种类型的风险解决方法(或直接的黑客工作)来制作列表 public 可以访问和“静态”在我们正在处理的主题限制内。
此问题的解决方案是设置引用作业的 Configuration 对象的用户命名值。这意味着您必须使用您可能在驱动程序中创建的 Configuration
对象来将 top10
设置为此类变量。由于您的 List
may 每个元素的长度相对“小” Strings
(即只有几个句子),您所要做的就是使用某种一个分隔符,用于将所有元素存储在一个 String
中(因为这是用于 Configuration
变量类型的数据类型),例如 element1#element2#element3#...
(但是 要非常小心,因为你必须总是确保有足够的内存让String
首先存在,这就是为什么这只是一个解决方法 毕竟)。
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("top10", " "); // initialize `top10` as an empty String
// the description of the job(s), etc, ...
}
为了读取和写入 top10
,首先你需要将它声明为 setup
函数,你需要在你的组合器和你的减速器中像这样(用当然,下面的代码片段显示了减速器的外观):
public static class FromParquetQueriesReducer extends Reducer<Text, Text, Void, GenericRecord>
{
private Schema schema;
private String top10;
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
top10 = context.getConfiguration().get("top10");
// everything else inside the setup function...
}
// ...
}
通过这两个调整,您可以在 reduce
函数中使用 top10
就好了,在使用 split
函数从 [=18] 中拆分元素之后=] 的 top10
像这样:
String[] data = top10.split("#"); // split the elements from the String
List<String> top10List = new ArrayList<>(); // create ArrayList
Collections.addAll(top10List, data); // put all the elements to the list
综上所述,我必须说这种功能远远超出了严重依赖 MapReduce 的普通 Hadoop 的能力。如果这不仅仅是 CS class 分配,您需要在此处重新评估 Hadoop 的 MapReduce 引擎的使用情况,以便通过 Apache Hive or Apache Spark 这样的“扩展”来解决所有这些问题更灵活,更像 SQL,可以匹配您应用程序的某些方面。
所以我有一个程序,它使用 Mapper、Combiner 和 Reducer 来获取IMDB 存储库的某些字段和此程序在我 运行 在我的机器上运行时运行良好。
当我使用 Hadoop HDFS 将此代码放入 Docker 中的 运行 时,它没有得到我需要的一些值,或者准确地说,Combiner 将一些值放入 List,即 public class variable, 不起作用什么的因为当我尝试在 Reducer[= 中使用那个 List 38=] 看起来它总是空的。当我在我的机器上 运行ning 时(没有 Docker 和 Hadoop HDFS)它会将值放入List 但是当 运行ning on Docker 时,它看起来总是空的。我还在 main 上打印了 List 的大小,它 returns 0,有什么建议吗?
public class FromParquetToParquetFile{
public static List<String> top10 = new ArrayList<>();
....
}
Combiner 看起来像:
public static class FromParquetQueriesCombiner extends Reducer<Text,Text, Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
long total = 0;
long maior = -1;
String tconst = "";
String title = "";
for (Text value : values) {
total++; //numero de filmes
String[] fields = value.toString().split("\t");
top10.add(key.toString() + "\t" + fields[2] + "\t" + fields[3] + "\t" + fields[0] + "\t" + fields[1]);
int x = Integer.parseInt(fields[3]);
if (x >= maior) {
tconst = fields[0];
title = fields[1];
maior = x;
}
}
StringBuilder result = new StringBuilder();
result.append(total);
result.append("\t");
result.append(tconst);
result.append("\t");
result.append(title);
result.append("\t");
context.write(key, new Text(result.toString()));
}
}
Reducer 看起来像(它有一个排序列表的设置):
public static class FromParquetQueriesReducer extends Reducer<Text, Text, Void, GenericRecord> {
private Schema schema;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Collections.sort(top10, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
String[] aux = o1.split("\t");
String[] aux2 = o2.split("\t");
...
return -result;
}
});
schema = getSchema("hdfs:///schema.alinea2");
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
...
for(String s : top10)
...
}
}
如
Mapper 和 Reducer 实例是隔离的,或多或少“独立”于描述它们的函数“周围”的任何东西。这意味着他们实际上无法访问放在父 class(即这里的 FromParquetToParquetFile
)或程序的 driver/main 函数中的变量。从中我们可以理解(如果你想保留你的工作的当前功能方式)我们需要某种类型的风险解决方法(或直接的黑客工作)来制作列表 public 可以访问和“静态”在我们正在处理的主题限制内。
此问题的解决方案是设置引用作业的 Configuration 对象的用户命名值。这意味着您必须使用您可能在驱动程序中创建的 Configuration
对象来将 top10
设置为此类变量。由于您的 List
may 每个元素的长度相对“小” Strings
(即只有几个句子),您所要做的就是使用某种一个分隔符,用于将所有元素存储在一个 String
中(因为这是用于 Configuration
变量类型的数据类型),例如 element1#element2#element3#...
(但是 要非常小心,因为你必须总是确保有足够的内存让String
首先存在,这就是为什么这只是一个解决方法 毕竟)。
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
conf.set("top10", " "); // initialize `top10` as an empty String
// the description of the job(s), etc, ...
}
为了读取和写入 top10
,首先你需要将它声明为 setup
函数,你需要在你的组合器和你的减速器中像这样(用当然,下面的代码片段显示了减速器的外观):
public static class FromParquetQueriesReducer extends Reducer<Text, Text, Void, GenericRecord>
{
private Schema schema;
private String top10;
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
top10 = context.getConfiguration().get("top10");
// everything else inside the setup function...
}
// ...
}
通过这两个调整,您可以在 reduce
函数中使用 top10
就好了,在使用 split
函数从 [=18] 中拆分元素之后=] 的 top10
像这样:
String[] data = top10.split("#"); // split the elements from the String
List<String> top10List = new ArrayList<>(); // create ArrayList
Collections.addAll(top10List, data); // put all the elements to the list
综上所述,我必须说这种功能远远超出了严重依赖 MapReduce 的普通 Hadoop 的能力。如果这不仅仅是 CS class 分配,您需要在此处重新评估 Hadoop 的 MapReduce 引擎的使用情况,以便通过 Apache Hive or Apache Spark 这样的“扩展”来解决所有这些问题更灵活,更像 SQL,可以匹配您应用程序的某些方面。