Pyspark parallel ml.KMeans 覆盖对方的K
Pyspark parallel ml.KMeans overwrite each other's K
我并行地遵循这个 post 到 运行 KMeans。我在 EMR 上使用了 Python 2.7 和 Spark 2.0.2。
How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
如post中所述,不同进程提交的作业不应相互影响。
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."
http://spark.apache.org/docs/latest/job-scheduling.html
但是生成的模型的簇数 K 与传入的不同。
代码:
from pyspark.ml.clustering import KMeans
from sklearn.datasets.samples_generator import make_blobs
from pyspark.ml.linalg import Vectors
import random
random.seed(1)
group_size = 30
n_groups = 20
n_samples= n_groups * group_size
n_features=2
n_centers=4
xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
x_groups = []
for i in range(n_groups):
x_groups.append(xs[i*group_size: (i+1)*group_size])
def do_kmean(xs):
data = []
for x in xs:
data.append((Vectors.dense(x.tolist()),) )
df = spark.createDataFrame(data, ["features"])
num_clusters = random.randint(5,10)
kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(df)
return [num_clusters, kmeans.getK()]
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=8)
result = tpool.map(do_kmean, x_groups)
结果:(输入 K 与实际使用的 KMeans)
[[5, 9],
[8, 9],
[6, 8],
[10, 9],
[7, 9],
[9, 9],
[7, 9],
[9, 9],
[5, 5],
[5, 9],
[9, 7],
[9, 9],
[5, 7],
[10, 5],
[7, 7],
[7, 7],
[6, 6],
[10, 10],
[10, 10],
[5, 5]]
似乎 Spark thread/process 不安全并且正在访问其他进程的 K 副本。是否有任何 Spark 配置导致此问题或者这是一个 Spark 错误?
这确实是 Spark 2.0.2 和 2.1.0 的错误。我能够使用以上两个版本在我的本地机器上复制错误。该错误已针对 Spark 2.1.1 修复。
我并行地遵循这个 post 到 运行 KMeans。我在 EMR 上使用了 Python 2.7 和 Spark 2.0.2。
How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
如post中所述,不同进程提交的作业不应相互影响。
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)." http://spark.apache.org/docs/latest/job-scheduling.html
但是生成的模型的簇数 K 与传入的不同。
代码:
from pyspark.ml.clustering import KMeans
from sklearn.datasets.samples_generator import make_blobs
from pyspark.ml.linalg import Vectors
import random
random.seed(1)
group_size = 30
n_groups = 20
n_samples= n_groups * group_size
n_features=2
n_centers=4
xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
x_groups = []
for i in range(n_groups):
x_groups.append(xs[i*group_size: (i+1)*group_size])
def do_kmean(xs):
data = []
for x in xs:
data.append((Vectors.dense(x.tolist()),) )
df = spark.createDataFrame(data, ["features"])
num_clusters = random.randint(5,10)
kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(df)
return [num_clusters, kmeans.getK()]
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=8)
result = tpool.map(do_kmean, x_groups)
结果:(输入 K 与实际使用的 KMeans)
[[5, 9],
[8, 9],
[6, 8],
[10, 9],
[7, 9],
[9, 9],
[7, 9],
[9, 9],
[5, 5],
[5, 9],
[9, 7],
[9, 9],
[5, 7],
[10, 5],
[7, 7],
[7, 7],
[6, 6],
[10, 10],
[10, 10],
[5, 5]]
似乎 Spark thread/process 不安全并且正在访问其他进程的 K 副本。是否有任何 Spark 配置导致此问题或者这是一个 Spark 错误?
这确实是 Spark 2.0.2 和 2.1.0 的错误。我能够使用以上两个版本在我的本地机器上复制错误。该错误已针对 Spark 2.1.1 修复。