如何处理 Apache Spark 中 foreach 块之前运行的代码?

How to deal with code that runs before foreach block in Apache Spark?

我正在尝试处理一些代码,这些代码在 Spark 独立模式下 运行 与集群上的 Spark 运行 不同。基本上,对于 RDD 中的每个项目,我试图将其添加到列表中,一旦完成,我想将此列表发送到 Solr。

当我在 Spark 的独立模式下 运行 以下代码时,这工作得很好,但当相同的代码在集群上 运行 时,它不起作用。当我 运行 集群上的相同代码时,就像 "send to Solr" 部分代码在要发送到 Solr 的列表充满项目之前执行。我尝试在foreach之后用solrInputDocumentJavaRDD.collect();强制执行,但是好像没有任何效果。

// For each RDD
solrInputDocumentJavaDStream.foreachRDD(
        new Function<JavaRDD<SolrInputDocument>, Void>() {
          @Override
          public Void call(JavaRDD<SolrInputDocument> solrInputDocumentJavaRDD) throws Exception {

            // For each item in a single RDD
            solrInputDocumentJavaRDD.foreach(
                    new VoidFunction<SolrInputDocument>() {
                      @Override
                      public void call(SolrInputDocument solrInputDocument) {

                        // Add the solrInputDocument to the list of SolrInputDocuments
                        SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
                      }
                    });

            // Try to force execution
            solrInputDocumentJavaRDD.collect();


            // After having finished adding every SolrInputDocument to the list
            // add it to the solrServer, and commit, waiting for the commit to be flushed
            try {
              if (SolrIndexerDriver.solrInputDocumentList != null
                      && SolrIndexerDriver.solrInputDocumentList.size() > 0) {
                SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
                SolrIndexerDriver.solrServer.commit(true, true);
                SolrIndexerDriver.solrInputDocumentList.clear();
              }
            } catch (SolrServerException | IOException e) {
              e.printStackTrace();
            }


            return null;
          }
        }
);

我应该怎么做,以便在将 SolrDocuments 列表添加到 solrInputDocumentList 后执行发送到 Solr 部分(并且也可以在集群模式下工作)?

你在spark下查看过吗UI看这个job的执行计划。 检查它是如何分成阶段及其依赖关系的。这应该会给你一个有希望的想法。

正如我在 Spark 邮件列表中提到的: 我不熟悉 Solr API 但前提是 'SolrIndexerDriver' 是一个单例,我想集群上的 运行 是调用:

SolrIndexerDriver.solrInputDocumentList.add(elem)

发生在不同 JVM 上的 SolrIndexerDriver 的不同单例实例上,而

SolrIndexerDriver.solrServer.commit

发生在驱动程序上。

实际上,正在填写执行程序上的列表,但从未提交它们,而在驱动程序上则相反。

推荐的处理方法是像这样使用 foreachPartition

rdd.foreachPartition{iter =>
  // prepare connection
  Stuff.connect(...)
  // add elements
  iter.foreach(elem => Stuff.add(elem))
  // submit
  Stuff.commit()
}

这样就可以将每个分区的数据相加,并在每个执行器的本地上下文中提交结果。请注意,此 add/commit 必须是线程安全的,以避免数据丢失或损坏。