为什么使用MapReduce和HDFS时List<String>总是空的?

Why List<String> is always empty when using MapReduce and HDFS?

所以我有一个程序,它使用 MapperCombinerReducer 来获取IMDB 存储库的某些字段和此程序在我 运行 在我的机器上运行时运行良好。

当我使用 Hadoop HDFS 将此代码放入 Docker 中的 运行 时,它没有得到我需要的一些值,或者准确地说,Combiner 将一些值放入 List,即 public class variable, 不起作用什么的因为当我尝试在 Reducer[= 中使用那个 List 38=] 看起来它总是空的。当我在我的机器上 运行ning 时(没有 DockerHadoop HDFS)它会将值放入List 但是当 运行ning on Docker 时,它看起来总是空的。我还在 main 上打印了 List 的大小,它 returns 0,有什么建议吗?

public class FromParquetToParquetFile{

    public static List<String> top10 = new ArrayList<>();
    ....
}

Combiner 看起来像:

public static class FromParquetQueriesCombiner extends Reducer<Text,Text, Text,Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            long total = 0;
            long maior = -1;
            String tconst = "";
            String title = "";

            for (Text value : values) {

                total++; //numero de filmes
                String[] fields = value.toString().split("\t");
                top10.add(key.toString() + "\t" + fields[2] + "\t" + fields[3] + "\t" + fields[0] + "\t" + fields[1]);
                int x = Integer.parseInt(fields[3]);

                if (x >= maior) {
                    tconst = fields[0]; 
                    title = fields[1]; 
                    maior = x;
                }
            }

            StringBuilder result = new StringBuilder();
            result.append(total);
            result.append("\t");
            result.append(tconst);
            result.append("\t");
            result.append(title);
            result.append("\t");
            context.write(key, new Text(result.toString()));
        }
    }

Reducer 看起来像(它有一个排序列表的设置):

public static class FromParquetQueriesReducer extends Reducer<Text, Text, Void, GenericRecord> {
        private Schema schema;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Collections.sort(top10, new Comparator<String>() {
                @Override
                public int compare(String o1, String o2) {
                    String[] aux = o1.split("\t");
                    String[] aux2 = o2.split("\t");

                    ...
                    return -result;
                }
            });
            schema = getSchema("hdfs:///schema.alinea2");
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
         
           ...
           for(String s : top10)
           ...
           }
      }

所述,“public”变量(在 Java 意义上)不能完全“转化”为旨在在分布式系统中实施的并行计算模型(这就是为什么当您在本地 运行 应用程序没有任何问题时,当您 运行 它沿着 HDFS 运行时,事情“崩溃”了。

Mapper 和 Reducer 实例是隔离的,或多或少“独立”于描述它们的函数“周围”的任何东西。这意味着他们实际上无法访问放在父 class(即这里的 FromParquetToParquetFile)或程序的 driver/main 函数中的变量。从中我们可以理解(如果你想保留你的工作的当前功能方式)我们需要某种类型的风险解决方法(或直接的黑客工作)来制作列表 public 可以访问和“静态”在我们正在处理的主题限制内。

此问题的解决方案是设置引用作业的 Configuration 对象的用户命名值。这意味着您必须使用您可能在驱动程序中创建的 Configuration 对象来将 top10 设置为此类变量。由于您的 List may 每个元素的长度相对“小” Strings (即只有几个句子),您所要做的就是使用某种一个分隔符,用于将所有元素存储在一个 String 中(因为这是用于 Configuration 变量类型的数据类型),例如 element1#element2#element3#...但是 非常小心,因为你必须总是确保有足够的内存让String首先存在,这就是为什么这只是一个解决方法 毕竟)。

public static void main(String[] args) throws Exception 
{
    Configuration conf = new Configuration();

    conf.set("top10", " ");    // initialize `top10` as an empty String

    // the description of the job(s), etc, ...
}

为了读取和写入 top10,首先你需要将它声明为 setup 函数,你需要在你的组合器和你的减速器中像这样(用当然,下面的代码片段显示了减速器的外观):

public static class FromParquetQueriesReducer extends Reducer<Text, Text, Void, GenericRecord> 
{
    private Schema schema;
    private String top10;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException 
    {
        top10 = context.getConfiguration().get("top10");

        // everything else inside the setup function...
    }

    // ...
}

通过这两个调整,您可以在 reduce 函数中使用 top10 就好了,在使用 split 函数从 [=18] 中拆分元素之后=] 的 top10 像这样:

String[] data = top10.split("#");           // split the elements from the String
List<String> top10List = new ArrayList<>(); // create ArrayList 
Collections.addAll(top10List, data);        // put all the elements to the list

综上所述,我必须说这种功能远远超出了严重依赖 MapReduce 的普通 Hadoop 的能力。如果这不仅仅是 CS class 分配,您需要在此处重新评估 Hadoop 的 MapReduce 引擎的使用情况,以便通过 Apache Hive or Apache Spark 这样的“扩展”来解决所有这些问题更灵活,更像 SQL,可以匹配您应用程序的某些方面。