如何使用 Pyspark Streaming 模块实现 RabbitMQ 消费者?

How to implement a RabbitMQ consumer using Pyspark Streaming module?

我有一个 Apache Spark 集群和一个 RabbitMQ 代理,我想使用 pyspark.streaming 模块使用消息并计算一些指标。

问题是我只找到了 this package,但是在 JavaScala 中实现了。除此之外,我没有在 Python.

中找到任何示例或桥接实现

我有一个使用 Pika 实现的消费者,但我不知道如何将负载传递给我的 StreamingContext

此解决方案使用来自 Spark Streaming

pika asynchronous consumer examplesocketTextStream 方法
  1. 下载示例并将其另存为 .py 文件
  2. 修改文件以使用您自己的 RabbitMQ 凭据和连接参数。在我的例子中,我不得不修改 Consumer class
  3. if __name__ == '__main__':下,我们需要打开一个套接字,其中HOSTPORT对应于您到Spark Streaming的TCP连接。我们必须将方法 sendall 从套接字保存到一个变量中,将其传递给 Consumer class

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
      s.bind((HOST, PORT))
      s.listen(1)
      conn, addr = s.accept()
      dispatcher = conn.sendall #assigning sendall to dispatcher variable
    consumer = Consumer(dispatcher)
    try:
      consumer.run()
    except Exception as e:
      consumer.stop()
      s.close()
    
  4. 修改Consumer中的__init__方法,将dispatcher

    def __init__(self,dispatcher):
      self._connection = None
      self._channel = None
      self._closing = False
      self._consumer_tag = None
      self._url = amqp_url
      #new code
      self._dispatcher = dispatcher
    
  5. 在Consumer内部的方法on_message中调用self._dispatcher发送AMQP消息body

    def on_message(self, unused_channel, basic_deliver, properties, body):
      self._channel.basic_ack(basic_deliver.delivery_tag)
      try:
        # we need an '\n' at the each row Spark socketTextStream
        self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
      except Exception as e:
        raise
    
  6. 在Spark中,将ssc.socketTextStream(HOST, int(PORT))HOSTPORT对应到我们的TCP套接字。 Spark 将管理连接

  7. 运行 首先是消费者,然后是 Spark 应用程序

最后的评论:

  • 尝试 运行 你的消费者在不同的机器上而不是你的 Spark 机器上
  • 任何超过 10000 的端口都应该没问题。不要让内核打开一些随机端口
  • 平台:Linux Debian 7 和 8,以及 Ubuntu 14.04 和 16.04
  • 鼠兔版本 0.10.0
  • Python 版本 3.5.2
  • Spark 版本 1.6.1、1.6.2 和 2.0.0