在阿帕奇星火。如何设置worker/executor的环境变量?
In Apache Spark. How to set worker/executor's environment variables?
我在 EMR 上的 spark 程序不断出现此错误:
Caused by: javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
at sun.security.ssl.SSLSessionImpl.getPeerCertificates(SSLSessionImpl.java:421)
at org.apache.http.conn.ssl.AbstractVerifier.verify(AbstractVerifier.java:128)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:397)
at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:148)
at org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:149)
at org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:121)
at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:573)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:425)
at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:820)
at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:754)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:942)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2148)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2075)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1093)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:548)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at org.apache.hadoop.fs.s3native.$Proxy8.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
我做了一些研究,发现可以通过设置环境变量在低安全性情况下禁用此身份验证:
com.amazonaws.sdk.disableCertChecking=true
但是我只能用spark-submit.sh --conf来设置,只影响driver,大部分错误都在worker上。
有没有办法将它们传播给工人?
非常感谢。
刚刚在 Spark documentation 中偶然发现了一些东西:
spark.executorEnv.[EnvironmentVariableName]
Add the environment variable specified by EnvironmentVariableName to
the Executor process. The user can specify multiple of these to set
multiple environment variables.
因此,在您的情况下,我将 Spark 配置选项 spark.executorEnv.com.amazonaws.sdk.disableCertChecking
设置为 true
,看看是否有帮助。
在现有答案中添加更多内容。
import pyspark
def get_spark_context(app_name):
# configure
conf = pyspark.SparkConf()
conf.set('spark.app.name', app_name)
# init & return
sc = pyspark.SparkContext.getOrCreate(conf=conf)
# Configure your application specific setting
# Set environment value for the executors
conf.set(f'spark.executorEnv.SOME_ENVIRONMENT_VALUE', 'I_AM_PRESENT')
return pyspark.SQLContext(sparkContext=sc)
SOME_ENVIRONMENT_VALUE
环境变量将在 executors/workers 中可用。
在您的 spark 应用程序中,您可以像这样访问它们:
import os
some_environment_value = os.environ.get('SOME_ENVIRONMENT_VALUE')
在其他答案的基础上,这是一个有效的完整示例 (PySpark 2.4.1)。在此示例中,我强制所有工作人员在英特尔 MKL 内核库中为每个内核生成一个线程:
import pyspark
conf = pyspark.conf.SparkConf().setAll([
('spark.executorEnv.OMP_NUM_THREADS', '1'),
('spark.workerEnv.OMP_NUM_THREADS', '1'),
('spark.executorEnv.OPENBLAS_NUM_THREADS', '1'),
('spark.workerEnv.OPENBLAS_NUM_THREADS', '1'),
('spark.executorEnv.MKL_NUM_THREADS', '1'),
('spark.workerEnv.MKL_NUM_THREADS', '1'),
])
spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()
# print current PySpark configuration to be sure
print("Current PySpark settings: ", spark.sparkContext._conf.getAll())
对于 spark 2.4,@Amit Kushwaha 的方法不起作用。
我测试过:
1。集群模式
spark-submit --conf spark.executorEnv.DEBUG=1 --conf spark.appMasterEnv.DEBUG=1 --conf spark.yarn.appMasterEnv.DEBUG=1 --conf spark.yarn.executorEnv.DEBUG=1 main.py
2。客户端模式
spark-submit --deploy-mode=client --conf spark.executorEnv.DEBUG=1 --conf spark.appMasterEnv.DEBUG=1 --conf spark.yarn.appMasterEnv.DEBUG=1 --conf spark.yarn.executorEnv.DEBUG=1 main.py
上面的 None 可以将环境变量设置到执行器系统中(也就是 os.environ.get('DEBUG')
无法读取)。
唯一的方法是从 spark.conf:
提交:
spark-submit --conf DEBUG=1 main.py
获取变量:
DEBUG = spark.conf.get('DEBUG')
我在 EMR 上的 spark 程序不断出现此错误:
Caused by: javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
at sun.security.ssl.SSLSessionImpl.getPeerCertificates(SSLSessionImpl.java:421)
at org.apache.http.conn.ssl.AbstractVerifier.verify(AbstractVerifier.java:128)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:397)
at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:148)
at org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:149)
at org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:121)
at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:573)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:425)
at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:820)
at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:754)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:942)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2148)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2075)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1093)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:548)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at org.apache.hadoop.fs.s3native.$Proxy8.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
我做了一些研究,发现可以通过设置环境变量在低安全性情况下禁用此身份验证:
com.amazonaws.sdk.disableCertChecking=true
但是我只能用spark-submit.sh --conf来设置,只影响driver,大部分错误都在worker上。
有没有办法将它们传播给工人?
非常感谢。
刚刚在 Spark documentation 中偶然发现了一些东西:
spark.executorEnv.[EnvironmentVariableName]
Add the environment variable specified by EnvironmentVariableName to the Executor process. The user can specify multiple of these to set multiple environment variables.
因此,在您的情况下,我将 Spark 配置选项 spark.executorEnv.com.amazonaws.sdk.disableCertChecking
设置为 true
,看看是否有帮助。
在现有答案中添加更多内容。
import pyspark
def get_spark_context(app_name):
# configure
conf = pyspark.SparkConf()
conf.set('spark.app.name', app_name)
# init & return
sc = pyspark.SparkContext.getOrCreate(conf=conf)
# Configure your application specific setting
# Set environment value for the executors
conf.set(f'spark.executorEnv.SOME_ENVIRONMENT_VALUE', 'I_AM_PRESENT')
return pyspark.SQLContext(sparkContext=sc)
SOME_ENVIRONMENT_VALUE
环境变量将在 executors/workers 中可用。
在您的 spark 应用程序中,您可以像这样访问它们:
import os
some_environment_value = os.environ.get('SOME_ENVIRONMENT_VALUE')
在其他答案的基础上,这是一个有效的完整示例 (PySpark 2.4.1)。在此示例中,我强制所有工作人员在英特尔 MKL 内核库中为每个内核生成一个线程:
import pyspark
conf = pyspark.conf.SparkConf().setAll([
('spark.executorEnv.OMP_NUM_THREADS', '1'),
('spark.workerEnv.OMP_NUM_THREADS', '1'),
('spark.executorEnv.OPENBLAS_NUM_THREADS', '1'),
('spark.workerEnv.OPENBLAS_NUM_THREADS', '1'),
('spark.executorEnv.MKL_NUM_THREADS', '1'),
('spark.workerEnv.MKL_NUM_THREADS', '1'),
])
spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()
# print current PySpark configuration to be sure
print("Current PySpark settings: ", spark.sparkContext._conf.getAll())
对于 spark 2.4,@Amit Kushwaha 的方法不起作用。
我测试过:
1。集群模式
spark-submit --conf spark.executorEnv.DEBUG=1 --conf spark.appMasterEnv.DEBUG=1 --conf spark.yarn.appMasterEnv.DEBUG=1 --conf spark.yarn.executorEnv.DEBUG=1 main.py
2。客户端模式
spark-submit --deploy-mode=client --conf spark.executorEnv.DEBUG=1 --conf spark.appMasterEnv.DEBUG=1 --conf spark.yarn.appMasterEnv.DEBUG=1 --conf spark.yarn.executorEnv.DEBUG=1 main.py
上面的 None 可以将环境变量设置到执行器系统中(也就是 os.environ.get('DEBUG')
无法读取)。
唯一的方法是从 spark.conf:
提交:
spark-submit --conf DEBUG=1 main.py
获取变量:
DEBUG = spark.conf.get('DEBUG')