使用 pyspark 运行 本地 spark 集群上的作业
Using pyspark to run a job on premises spark cluster
我有一个小型的本地 Spark 3.2.0 集群,其中一台机器是主机,另外两台机器是从机。集群部署在“裸机”上,当我从主机上 运行 pyspark 时一切正常。
当我尝试从另一台机器 运行 任何东西时,就会出现问题。这是我的代码:
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession, functions
spark = SparkSession.builder.appName("extrair_comex").config("spark.executor.memory", "1g").master("spark://srvsparkm-dev:7077").getOrCreate()
link = 'https://www.stats.govt.nz/assets/Uploads/International-trade/International-trade-September-2021-quarter/Download-data/overseas-trade-indexes-September-2021-quarter-provisional-csv.csv'
arquivo = pd.read_csv(link)
df_spark = spark.createDataFrame(arquivo.astype(str))
df_spark.write.mode('overwrite').parquet(f'hdfs://srvsparkm-dev:9000/lnd/arquivo_extraido_comex.parquet')
其中“srvsparkm-dev”是 spark 主 IP 的别名。
检查“extrair_comex”作业的日志,我看到了:
Spark 执行器命令:
Spark 执行器命令:“/usr/lib/jvm/java-8-openjdk-amd64/bin/java”“-cp”“/home/spark/spark/conf/:/home/spark/spark/jars/*”“-Xmx1024M”“-Dspark.driver.port=38571” “org.apache.spark.executor.CoarseGrainedExecutorBackend” “--driver-url” “spark://CoarseGrainedScheduler@srvairflowcelery-dev:38571” “--executor-id” “157” “--hostname” “srvsparksl1-dev " "--cores" "2" "--app-id" "app-20220204183041-0031" "--worker-url" "spark://Worker@srvsparksl1-dev:37383"
错误:
其中“srvairflowcelery-dev”是 pyspark 脚本所在的机器运行ning。
Caused by: java.io.IOException: Failed to connect to srvairflowcelery-dev/xx.xxx.xxx.xx:38571
其中 xx.xxx.xxx.xx 是 srvairflowcelery-dev 的 IP。
在我看来,主人正在将任务分配给客户端运行,这就是它失败的原因。
我该怎么办?我不能从另一台机器提交作业吗?
我解决了这个问题。问题是 srvairflowcelery 在 docker 上,所以只有一些端口是打开的。除此之外,spark master 尝试在驱动程序的随机端口 (srvairflowcelery) 上进行通信,因此关闭某些端口是一个问题。
我所做的是:
- 打开了我的 airflow worker 的一系列端口:
airflow-worker:
<<: *airflow-common
command: celery worker
hostname: ${HOSTNAME}
ports:
- 8793:8793
- "51800-51900:51800-51900"
- 在我的 pyspark 代码固定端口上设置:
spark = SparkSession.builder.appName("extrair_comex_sb") \
.config("spark.executor.memory", "1g") \
.config("spark.driver.port", "51810") \
.config("spark.fileserver.port", "51811") \
.config("spark.broadcast.port", "51812") \
.config("spark.replClassServer.port", "51813") \
.config("spark.blockManager.port", "51814") \
.config("spark.executor.port", "51815") \
.master("spark://srvsparkm-dev:7077") \
.getOrCreate()
这解决了问题。
我有一个小型的本地 Spark 3.2.0 集群,其中一台机器是主机,另外两台机器是从机。集群部署在“裸机”上,当我从主机上 运行 pyspark 时一切正常。
当我尝试从另一台机器 运行 任何东西时,就会出现问题。这是我的代码:
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession, functions
spark = SparkSession.builder.appName("extrair_comex").config("spark.executor.memory", "1g").master("spark://srvsparkm-dev:7077").getOrCreate()
link = 'https://www.stats.govt.nz/assets/Uploads/International-trade/International-trade-September-2021-quarter/Download-data/overseas-trade-indexes-September-2021-quarter-provisional-csv.csv'
arquivo = pd.read_csv(link)
df_spark = spark.createDataFrame(arquivo.astype(str))
df_spark.write.mode('overwrite').parquet(f'hdfs://srvsparkm-dev:9000/lnd/arquivo_extraido_comex.parquet')
其中“srvsparkm-dev”是 spark 主 IP 的别名。
检查“extrair_comex”作业的日志,我看到了:
Spark 执行器命令:
Spark 执行器命令:“/usr/lib/jvm/java-8-openjdk-amd64/bin/java”“-cp”“/home/spark/spark/conf/:/home/spark/spark/jars/*”“-Xmx1024M”“-Dspark.driver.port=38571” “org.apache.spark.executor.CoarseGrainedExecutorBackend” “--driver-url” “spark://CoarseGrainedScheduler@srvairflowcelery-dev:38571” “--executor-id” “157” “--hostname” “srvsparksl1-dev " "--cores" "2" "--app-id" "app-20220204183041-0031" "--worker-url" "spark://Worker@srvsparksl1-dev:37383"
错误:
其中“srvairflowcelery-dev”是 pyspark 脚本所在的机器运行ning。
Caused by: java.io.IOException: Failed to connect to srvairflowcelery-dev/xx.xxx.xxx.xx:38571
其中 xx.xxx.xxx.xx 是 srvairflowcelery-dev 的 IP。
在我看来,主人正在将任务分配给客户端运行,这就是它失败的原因。 我该怎么办?我不能从另一台机器提交作业吗?
我解决了这个问题。问题是 srvairflowcelery 在 docker 上,所以只有一些端口是打开的。除此之外,spark master 尝试在驱动程序的随机端口 (srvairflowcelery) 上进行通信,因此关闭某些端口是一个问题。
我所做的是:
- 打开了我的 airflow worker 的一系列端口:
airflow-worker:
<<: *airflow-common
command: celery worker
hostname: ${HOSTNAME}
ports:
- 8793:8793
- "51800-51900:51800-51900"
- 在我的 pyspark 代码固定端口上设置:
spark = SparkSession.builder.appName("extrair_comex_sb") \
.config("spark.executor.memory", "1g") \
.config("spark.driver.port", "51810") \
.config("spark.fileserver.port", "51811") \
.config("spark.broadcast.port", "51812") \
.config("spark.replClassServer.port", "51813") \
.config("spark.blockManager.port", "51814") \
.config("spark.executor.port", "51815") \
.master("spark://srvsparkm-dev:7077") \
.getOrCreate()
这解决了问题。