Pyspark parallel ml.KMeans 覆盖对方的K

Pyspark parallel ml.KMeans overwrite each other's K

我并行地遵循这个 post 到 运行 KMeans。我在 EMR 上使用了 Python 2.7 和 Spark 2.0.2。

How to run multiple jobs in one Sparkcontext from separate threads in PySpark?

如post中所述,不同进程提交的作业不应相互影响。

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)." http://spark.apache.org/docs/latest/job-scheduling.html

但是生成的模型的簇数 K 与传入的不同。

代码:

from pyspark.ml.clustering import KMeans
from sklearn.datasets.samples_generator import make_blobs
from pyspark.ml.linalg import Vectors
import random
random.seed(1)

group_size = 30
n_groups = 20

n_samples= n_groups * group_size
n_features=2
n_centers=4

xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
x_groups = []
for i in range(n_groups):
    x_groups.append(xs[i*group_size: (i+1)*group_size])


def do_kmean(xs):
    data = []
    for x in xs:
        data.append((Vectors.dense(x.tolist()),) )
    df = spark.createDataFrame(data, ["features"])

    num_clusters = random.randint(5,10)
    kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")
    model = kmeans.fit(df)
    return [num_clusters, kmeans.getK()]

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=8)

result = tpool.map(do_kmean, x_groups)

结果:(输入 K 与实际使用的 KMeans)

[[5, 9],
 [8, 9],
 [6, 8],
 [10, 9],
 [7, 9],
 [9, 9],
 [7, 9],
 [9, 9],
 [5, 5],
 [5, 9],
 [9, 7],
 [9, 9],
 [5, 7],
 [10, 5],
 [7, 7],
 [7, 7],
 [6, 6],
 [10, 10],
 [10, 10],
 [5, 5]]

似乎 Spark thread/process 不安全并且正在访问其他进程的 K 副本。是否有任何 Spark 配置导致此问题或者这是一个 Spark 错误?

这确实是 Spark 2.0.2 和 2.1.0 的错误。我能够使用以上两个版本在我的本地机器上复制错误。该错误已针对 Spark 2.1.1 修复。

https://issues.apache.org/jira/browse/SPARK-19348