来自多个rabbitmq队列的Spark Streaming并行处理

Spark Streaming processing from multiple rabbitmq queue in parallel

我正在尝试为多个 RabbitMQ 队列设置 Spark 流。如下所述,我设置了 2 个工作人员,每个工作人员都有一个核心和 2GB 内存。所以,问题是当我将此参数保持为 conf.set("spark.cores.max","2") 时,流媒体不会处理任何数据,它只是继续添加作业。但是一旦我将它设置为 conf.set("spark.cores.max","3") 流媒体就开始处理它。所以,我不明白这是为什么。另外,如果我想从两个队列并行处理数据,我应该怎么做。我在下面提到了我的代码和配置设置。

Spark-env.sh:

SPARK_WORKER_MEMORY=2g 
SPARK_WORKER_INSTANCES=1
SPARK_WORKER_CORES=1

Scala 代码:

    val rabbitParams =  Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys"))
    val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
    receiverStream.start()      

    val predRabbitParams =  Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1"))
    val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams)
    predReceiverStream.start()  

此行为在 Streaming Guide 中进行了解释。每个接收器都是一个长 运行 进程,占用一个线程。

如果可用线程的数量小于或等于接收器的数量,则没有剩余资源可用于任务处理:

the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.