Spark Structured Stream Executors 奇怪的行为
Spark Structured Stream Executors weird behavior
使用 Spark Structured Stream,配合 Cloudera 解决方案
我正在使用 3 个执行程序,但是当我启动应用程序时,使用的执行程序只有一个。
如何使用多个执行器?
让我给你更多的信息。
这是我的参数:
命令启动:
spark2-submit --master yarn \
--deploy-mode cluster \
--conf spark.ui.port=4042 \
--conf spark.eventLog.enabled=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.kafka.consumer.poll.ms=512 \
--num-executors 3 \
--executor-cores 3 \
--executor-memory 2g \
--jars /data/test/spark-avro_2.11-3.2.0.jar,/data/test/spark-streaming-kafka-0-10_2.11-2.1.0.cloudera1.jar,/data/test/spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar \
--class com.test.Hello /data/test/Hello.jar
代码:
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", <topic_list:9092>)
.option("subscribe", <topic_name>)
.option("group.id", <consumer_group_id>)
.load()
.select($"value".as[Array[Byte]], $"timestamp")
.map((c) => { .... })
val query = lines
.writeStream
.format("csv")
.option("path", <outputPath>)
.option("checkpointLocation", <checkpointLocationPath>)
.start()
query.awaitTermination()
SparkUI 中的结果:
SparkUI Image
我期望所有执行者都在工作。
有什么建议吗?
谢谢
保罗
看起来你的配置没有问题,只是你使用的分区可能只有一个。您需要增加 kafka 生产者中的分区。通常,分区数大约是执行程序数的 3-4 倍。
如果您不想触及生产者代码,可以在应用 map 方法之前执行 repartition(3) 来解决这个问题,这样每个执行者都在其自己的逻辑分区上工作。
如果您仍然希望明确提及每个执行者获得的工作,您可以使用 mapPerPartion 方法。
使用 Spark Structured Stream,配合 Cloudera 解决方案 我正在使用 3 个执行程序,但是当我启动应用程序时,使用的执行程序只有一个。 如何使用多个执行器?
让我给你更多的信息。 这是我的参数:
命令启动:
spark2-submit --master yarn \
--deploy-mode cluster \
--conf spark.ui.port=4042 \
--conf spark.eventLog.enabled=false \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.kafka.consumer.poll.ms=512 \
--num-executors 3 \
--executor-cores 3 \
--executor-memory 2g \
--jars /data/test/spark-avro_2.11-3.2.0.jar,/data/test/spark-streaming-kafka-0-10_2.11-2.1.0.cloudera1.jar,/data/test/spark-sql-kafka-0-10_2.11-2.1.0.cloudera1.jar \
--class com.test.Hello /data/test/Hello.jar
代码:
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", <topic_list:9092>)
.option("subscribe", <topic_name>)
.option("group.id", <consumer_group_id>)
.load()
.select($"value".as[Array[Byte]], $"timestamp")
.map((c) => { .... })
val query = lines
.writeStream
.format("csv")
.option("path", <outputPath>)
.option("checkpointLocation", <checkpointLocationPath>)
.start()
query.awaitTermination()
SparkUI 中的结果: SparkUI Image
我期望所有执行者都在工作。
有什么建议吗?
谢谢 保罗
看起来你的配置没有问题,只是你使用的分区可能只有一个。您需要增加 kafka 生产者中的分区。通常,分区数大约是执行程序数的 3-4 倍。
如果您不想触及生产者代码,可以在应用 map 方法之前执行 repartition(3) 来解决这个问题,这样每个执行者都在其自己的逻辑分区上工作。
如果您仍然希望明确提及每个执行者获得的工作,您可以使用 mapPerPartion 方法。