python 中使用 pika 的 SparkStreaming、RabbitMQ 和 MQTT
SparkStreaming, RabbitMQ and MQTT in python using pika
为了让事情变得棘手,我想使用来自 rabbitMQ 队列的消息。现在我知道在 rabbit (https://www.rabbitmq.com/mqtt.html) 上有一个 MQTT 插件。
但是,我似乎无法制作一个 Spark 使用从 pika 生成的消息的示例。
例如,我在这里使用简单的 wordcount.py 程序 (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) 看看我是否可以通过以下方式看到消息 producer :
import sys
import pika
import json
import future
import pprofile
def sendJson(json):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='analytics', durable=True)
channel.queue_bind(exchange='analytics_exchange',
queue='analytics')
channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
connection.close()
if __name__ == "__main__":
with open(sys.argv[1],'r') as json_file:
sendJson(json_file.read())
sparkstreaming 消费者如下:
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")
#RabbitMQ
"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""
brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
然而,与简单的 wordcount 示例不同,我无法让它工作并出现以下错误:
16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)
所以我的问题是,MQTTUtils.createStream(ssc, brokerUrl, topic)
的设置应该是什么来监听队列,是否有更完整的示例以及这些如何映射到 rabbitMQ 的示例。
我是 运行 我的消费者代码:./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py
我已经按照一条评论的建议使用 TCP 参数更新了生产者代码:
url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
火花流为:
brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()
根据 MqttAsyncClient
Javadoc,服务器 URI 必须具有以下方案之一:tcp://
、ssl://
或 local://
。您需要更改上面的 brokerUrl
以使用其中一种方案。
有关详细信息,请参阅 link MqttAsyncClient
的来源:
您似乎使用了错误的端口号。假设:
- 您有一个 RabbitMQ 的本地实例 运行 默认设置并且您启用了 MQTT 插件 (
rabbitmq-plugins enable rabbitmq_mqtt
) 并重新启动了 RabbitMQ 服务器
- 在执行
spark-submit
/pyspark
时包含spark-streaming-mqtt
(使用packages
或jars
/driver-class-path
)
您可以使用 TCP 连接 tcp://localhost:1883
。您还必须记住 MQTT 使用的是 amq.topic
。
快速入门:
创建 Dockerfile
,内容如下:
FROM rabbitmq:3-management
RUN rabbitmq-plugins enable rabbitmq_mqtt
构建Docker图像:
docker build -t rabbit_mqtt .
启动映像并等待服务器准备就绪:
docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt
创建 producer.py
内容如下:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='amq.topic',
type='topic', durable=True)
for i in range(1000):
channel.basic_publish(
exchange='amq.topic', # amq.topic as exchange
routing_key='hello', # Routing key used by producer
body='Hello World {0}'.format(i)
)
time.sleep(3)
connection.close()
开始生产者
python producer.py
并访问管理控制台http://127.0.0.1:15672/#/exchanges/%2F/amq.topic
查看是否收到消息。
创建 consumer.py
内容如下:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext()
ssc = StreamingContext(sc, 10)
mqttStream = MQTTUtils.createStream(
ssc,
"tcp://localhost:1883", # Note both port number and protocol
"hello" # The same routing key as used by producer
)
mqttStream.count().pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()
下载依赖(调整Scala版本为构建Spark时使用的版本和Spark版本):
mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
确保 SPARK_HOME
和 PYTHONPATH
指向正确的目录。
提交 consumer.py
with (adjust versions as before):
spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
如果您按照所有步骤操作,您应该会在 Spark 日志中看到 Hello world 消息。
为了让事情变得棘手,我想使用来自 rabbitMQ 队列的消息。现在我知道在 rabbit (https://www.rabbitmq.com/mqtt.html) 上有一个 MQTT 插件。
但是,我似乎无法制作一个 Spark 使用从 pika 生成的消息的示例。
例如,我在这里使用简单的 wordcount.py 程序 (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) 看看我是否可以通过以下方式看到消息 producer :
import sys
import pika
import json
import future
import pprofile
def sendJson(json):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='analytics', durable=True)
channel.queue_bind(exchange='analytics_exchange',
queue='analytics')
channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
connection.close()
if __name__ == "__main__":
with open(sys.argv[1],'r') as json_file:
sendJson(json_file.read())
sparkstreaming 消费者如下:
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")
#RabbitMQ
"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""
brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
然而,与简单的 wordcount 示例不同,我无法让它工作并出现以下错误:
16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)
所以我的问题是,MQTTUtils.createStream(ssc, brokerUrl, topic)
的设置应该是什么来监听队列,是否有更完整的示例以及这些如何映射到 rabbitMQ 的示例。
我是 运行 我的消费者代码:./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py
我已经按照一条评论的建议使用 TCP 参数更新了生产者代码:
url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
火花流为:
brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()
根据 MqttAsyncClient
Javadoc,服务器 URI 必须具有以下方案之一:tcp://
、ssl://
或 local://
。您需要更改上面的 brokerUrl
以使用其中一种方案。
有关详细信息,请参阅 link MqttAsyncClient
的来源:
您似乎使用了错误的端口号。假设:
- 您有一个 RabbitMQ 的本地实例 运行 默认设置并且您启用了 MQTT 插件 (
rabbitmq-plugins enable rabbitmq_mqtt
) 并重新启动了 RabbitMQ 服务器 - 在执行
spark-submit
/pyspark
时包含spark-streaming-mqtt
(使用packages
或jars
/driver-class-path
)
您可以使用 TCP 连接 tcp://localhost:1883
。您还必须记住 MQTT 使用的是 amq.topic
。
快速入门:
创建
Dockerfile
,内容如下:FROM rabbitmq:3-management RUN rabbitmq-plugins enable rabbitmq_mqtt
构建Docker图像:
docker build -t rabbit_mqtt .
启动映像并等待服务器准备就绪:
docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt
创建
producer.py
内容如下:import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='amq.topic', type='topic', durable=True) for i in range(1000): channel.basic_publish( exchange='amq.topic', # amq.topic as exchange routing_key='hello', # Routing key used by producer body='Hello World {0}'.format(i) ) time.sleep(3) connection.close()
开始生产者
python producer.py
并访问管理控制台http://127.0.0.1:15672/#/exchanges/%2F/amq.topic
查看是否收到消息。
创建
consumer.py
内容如下:from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.mqtt import MQTTUtils sc = SparkContext() ssc = StreamingContext(sc, 10) mqttStream = MQTTUtils.createStream( ssc, "tcp://localhost:1883", # Note both port number and protocol "hello" # The same routing key as used by producer ) mqttStream.count().pprint() ssc.start() ssc.awaitTermination() ssc.stop()
下载依赖(调整Scala版本为构建Spark时使用的版本和Spark版本):
mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
确保
SPARK_HOME
和PYTHONPATH
指向正确的目录。提交
consumer.py
with (adjust versions as before):spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
如果您按照所有步骤操作,您应该会在 Spark 日志中看到 Hello world 消息。