如何使用 Pyspark Streaming 模块实现 RabbitMQ 消费者?
How to implement a RabbitMQ consumer using Pyspark Streaming module?
我有一个 Apache Spark 集群和一个 RabbitMQ 代理,我想使用 pyspark.streaming
模块使用消息并计算一些指标。
问题是我只找到了 this package,但是在 Java 和 Scala 中实现了。除此之外,我没有在 Python.
中找到任何示例或桥接实现
我有一个使用 Pika 实现的消费者,但我不知道如何将负载传递给我的 StreamingContext
。
此解决方案使用来自 Spark Streaming
的 pika asynchronous consumer example 和 socketTextStream
方法
- 下载示例并将其另存为
.py
文件
- 修改文件以使用您自己的 RabbitMQ 凭据和连接参数。在我的例子中,我不得不修改
Consumer
class
在if __name__ == '__main__':
下,我们需要打开一个套接字,其中HOST
和PORT
对应于您到Spark Streaming的TCP连接。我们必须将方法 sendall
从套接字保存到一个变量中,将其传递给 Consumer
class
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
dispatcher = conn.sendall #assigning sendall to dispatcher variable
consumer = Consumer(dispatcher)
try:
consumer.run()
except Exception as e:
consumer.stop()
s.close()
修改Consumer中的__init__
方法,将dispatcher
def __init__(self,dispatcher):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
#new code
self._dispatcher = dispatcher
在Consumer内部的方法on_message
中调用self._dispatcher
发送AMQP消息body
def on_message(self, unused_channel, basic_deliver, properties, body):
self._channel.basic_ack(basic_deliver.delivery_tag)
try:
# we need an '\n' at the each row Spark socketTextStream
self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
except Exception as e:
raise
在Spark中,将ssc.socketTextStream(HOST, int(PORT))
与HOST
和PORT
对应到我们的TCP套接字。 Spark 将管理连接
运行 首先是消费者,然后是 Spark 应用程序
最后的评论:
- 尝试 运行 你的消费者在不同的机器上而不是你的 Spark 机器上
- 任何超过 10000 的端口都应该没问题。不要让内核打开一些随机端口
- 平台:Linux Debian 7 和 8,以及 Ubuntu 14.04 和 16.04
- 鼠兔版本 0.10.0
- Python 版本 3.5.2
- Spark 版本 1.6.1、1.6.2 和 2.0.0
我有一个 Apache Spark 集群和一个 RabbitMQ 代理,我想使用 pyspark.streaming
模块使用消息并计算一些指标。
问题是我只找到了 this package,但是在 Java 和 Scala 中实现了。除此之外,我没有在 Python.
中找到任何示例或桥接实现我有一个使用 Pika 实现的消费者,但我不知道如何将负载传递给我的 StreamingContext
。
此解决方案使用来自 Spark Streaming
的 pika asynchronous consumer example 和socketTextStream
方法
- 下载示例并将其另存为
.py
文件 - 修改文件以使用您自己的 RabbitMQ 凭据和连接参数。在我的例子中,我不得不修改
Consumer
class 在
if __name__ == '__main__':
下,我们需要打开一个套接字,其中HOST
和PORT
对应于您到Spark Streaming的TCP连接。我们必须将方法sendall
从套接字保存到一个变量中,将其传递给Consumer
classwith socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((HOST, PORT)) s.listen(1) conn, addr = s.accept() dispatcher = conn.sendall #assigning sendall to dispatcher variable consumer = Consumer(dispatcher) try: consumer.run() except Exception as e: consumer.stop() s.close()
修改Consumer中的
__init__
方法,将dispatcher
def __init__(self,dispatcher): self._connection = None self._channel = None self._closing = False self._consumer_tag = None self._url = amqp_url #new code self._dispatcher = dispatcher
在Consumer内部的方法
on_message
中调用self._dispatcher
发送AMQP消息body
def on_message(self, unused_channel, basic_deliver, properties, body): self._channel.basic_ack(basic_deliver.delivery_tag) try: # we need an '\n' at the each row Spark socketTextStream self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8")) except Exception as e: raise
在Spark中,将
ssc.socketTextStream(HOST, int(PORT))
与HOST
和PORT
对应到我们的TCP套接字。 Spark 将管理连接运行 首先是消费者,然后是 Spark 应用程序
最后的评论:
- 尝试 运行 你的消费者在不同的机器上而不是你的 Spark 机器上
- 任何超过 10000 的端口都应该没问题。不要让内核打开一些随机端口
- 平台:Linux Debian 7 和 8,以及 Ubuntu 14.04 和 16.04
- 鼠兔版本 0.10.0
- Python 版本 3.5.2
- Spark 版本 1.6.1、1.6.2 和 2.0.0