运行 Spark 中 map 函数内部的 ML 算法

Run ML algorithm inside map function in Spark

所以我已经尝试了几天 运行 Spark 映射函数中的 ML 算法。我发布了更具体的 question 但引用 Spark 的 ML 算法时出现以下错误:

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

显然我不能在 apply_classifier 函数中引用 SparkContext。 我的代码类似于我在上一个问题中提出的建议,但仍然没有找到我正在寻找的解决方案:

def apply_classifier(clf):
    dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
    if clf == 0:
        clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)
    elif clf == 1:
        clf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=5)

classifiers = [0, 1]

sc.parallelize(classifiers).map(lambda x: apply_classifier(x)).collect() 

我尝试使用 flatMap 而不是 map 但我得到 NoneType object is not iterable.

我还想将一个广播数据集(它是一个 DataFrame)作为参数传递给 apply_classifier 函数。 最后,是否有可能做我想做的事?有哪些替代方案?

is it possible to do what I am trying to do?

不是。 Apache Spark 不支持任何形式的嵌套,分布式操作只能由驱动程序初始化。这包括访问分布式数据结构,例如 Spark DataFrame.

What are the alternatives?

这取决于许多因素,例如数据大小、可用资源量和算法选择。一般来说,您有三种选择:

  • 仅将 Spark 用作任务管理工具来训练本地非分布式模型。看起来您已经在某种程度上探索了这条道路。对于此方法的更高级实现,您可以查看 spark-sklearn.

    一般来说,当数据相对较小时,这种方法特别有用。它的优点是多个工作之间没有竞争。

  • 使用标准多线程工具从单个上下文提交多个独立作业。您可以使用例如 or joblib.

    虽然这种方法可行,但我不会在实践中推荐它。并非所有 Spark 组件都是线程安全的,您必须非常小心以避免意外行为。它还使您几乎无法控制资源分配。

  • 参数化您的 Spark 应用程序并使用外部管道管理器 (Apache Airflow, Luigi, Toil) 提交您的作业。

    虽然这种方法有一些缺点(它需要将数据保存到持久存储中),但它也是最通用和最强大的,并且可以对资源分配进行很多控制。