在单台机器上使用 pyspark 设置任务槽
Setting task slots with pyspark on an individual machine
我正在尝试 运行 使用 hyperopt
库中的 SparkTrials
优化 ML 模型。我 运行 在一台有 16 个内核的机器上使用它,但是当我 运行 下面的代码将内核数设置为 8 时,我收到一条警告,似乎表明只使用了一个内核。
SparkTrials 接受参数 spark_session
理论上是我设置核心数的地方。
谁能帮帮我?
谢谢!
import os, shutil, tempfile
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import numpy as np
from sklearn import linear_model, datasets, model_selection
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").config('spark.local.dir', './').config("spark.executor.cores", 8).getOrCreate()
def gen_data(bytes):
"""
Generates train/test data with target total bytes for a random regression problem.
Returns (X_train, X_test, y_train, y_test).
"""
n_features = 100
n_samples = int(1.0 * bytes / (n_features + 1) / 8)
X, y = datasets.make_regression(n_samples=n_samples, n_features=n_features, random_state=0)
return model_selection.train_test_split(X, y, test_size=0.2, random_state=1)
def train_and_eval(data, alpha):
"""
Trains a LASSO model using training data with the input alpha and evaluates it using test data.
"""
X_train, X_test, y_train, y_test = data
model = linear_model.Lasso(alpha=alpha)
model.fit(X_train, y_train)
loss = model.score(X_test, y_test)
return {"loss": loss, "status": STATUS_OK}
def tune_alpha(objective):
"""
Uses Hyperopt's SparkTrials to tune the input objective, which takes alpha as input and returns loss.
Returns the best alpha found.
"""
best = fmin(
fn=objective,
space=hp.uniform("alpha", 0.0, 10.0),
algo=tpe.suggest,
max_evals=8,
trials=SparkTrials(parallelism=8,spark_session=spark))
return best["alpha"]
data_small = gen_data(10 * 1024 * 1024) # ~10MB
def objective_small(alpha):
# For small data, you might reference it directly.
return train_and_eval(data_small, alpha)
tune_alpha(objective_small)
Parallelism (8) is greater than the current total of Spark task slots
(1). If dynamic allocation is enabled, you might see more executors
allocated.
如果您在集群中: Spark 命名法中的内核与您在此处 CPU 中的物理内核无关,spark.executor.cores
您指定了最大值每个执行者(这里有一个)的线程(=任务)数量 运行 是 8 如果你想增加你必须在 command-line 或 [= 中使用 --num-executors
的执行者数量13=] 配置 属性 在你的代码中。
如果你在 yarn 集群中,我建议尝试类似这样的配置
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.executor.cores", 4)
spark.conf.set("spark.dynamicAllocation.minExecutors","2")
spark.conf.set("spark.dynamicAllocation.maxExecutors","10")
请考虑以上选项在本地模式下不可用
local: 在本地模式下你只有一个执行者,如果你想改变它的工作线程数(默认是一个)你必须设置你的主人像这样 local[*]
或 local[16]
我正在尝试 运行 使用 hyperopt
库中的 SparkTrials
优化 ML 模型。我 运行 在一台有 16 个内核的机器上使用它,但是当我 运行 下面的代码将内核数设置为 8 时,我收到一条警告,似乎表明只使用了一个内核。
SparkTrials 接受参数 spark_session
理论上是我设置核心数的地方。
谁能帮帮我?
谢谢!
import os, shutil, tempfile
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import numpy as np
from sklearn import linear_model, datasets, model_selection
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").config('spark.local.dir', './').config("spark.executor.cores", 8).getOrCreate()
def gen_data(bytes):
"""
Generates train/test data with target total bytes for a random regression problem.
Returns (X_train, X_test, y_train, y_test).
"""
n_features = 100
n_samples = int(1.0 * bytes / (n_features + 1) / 8)
X, y = datasets.make_regression(n_samples=n_samples, n_features=n_features, random_state=0)
return model_selection.train_test_split(X, y, test_size=0.2, random_state=1)
def train_and_eval(data, alpha):
"""
Trains a LASSO model using training data with the input alpha and evaluates it using test data.
"""
X_train, X_test, y_train, y_test = data
model = linear_model.Lasso(alpha=alpha)
model.fit(X_train, y_train)
loss = model.score(X_test, y_test)
return {"loss": loss, "status": STATUS_OK}
def tune_alpha(objective):
"""
Uses Hyperopt's SparkTrials to tune the input objective, which takes alpha as input and returns loss.
Returns the best alpha found.
"""
best = fmin(
fn=objective,
space=hp.uniform("alpha", 0.0, 10.0),
algo=tpe.suggest,
max_evals=8,
trials=SparkTrials(parallelism=8,spark_session=spark))
return best["alpha"]
data_small = gen_data(10 * 1024 * 1024) # ~10MB
def objective_small(alpha):
# For small data, you might reference it directly.
return train_and_eval(data_small, alpha)
tune_alpha(objective_small)
Parallelism (8) is greater than the current total of Spark task slots (1). If dynamic allocation is enabled, you might see more executors allocated.
如果您在集群中: Spark 命名法中的内核与您在此处 CPU 中的物理内核无关,spark.executor.cores
您指定了最大值每个执行者(这里有一个)的线程(=任务)数量 运行 是 8 如果你想增加你必须在 command-line 或 [= 中使用 --num-executors
的执行者数量13=] 配置 属性 在你的代码中。
如果你在 yarn 集群中,我建议尝试类似这样的配置
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.executor.cores", 4)
spark.conf.set("spark.dynamicAllocation.minExecutors","2")
spark.conf.set("spark.dynamicAllocation.maxExecutors","10")
请考虑以上选项在本地模式下不可用
local: 在本地模式下你只有一个执行者,如果你想改变它的工作线程数(默认是一个)你必须设置你的主人像这样 local[*]
或 local[16]