Kafka 和 Pyspark 集成
Kafka and Pyspark Integration
我对大数据很幼稚,我正在尝试将kafka连接到spark。
这是我的制作人代码
import os
import sys
import pykafka
def get_text():
## This block generates my required text.
text_as_bytes=text.encode(text)
producer.produce(text_as_bytes)
if __name__ == "__main__":
client = pykafka.KafkaClient("localhost:9092")
print ("topics",client.topics)
producer = client.topics[b'imagetext'].get_producer()
get_text()
当我这样做时,这是在控制台消费者上打印我生成的文本
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imagetext --from-beginning
现在我希望使用 Spark 使用此文本,这是我的 Jupyter 代码
import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,5)
print('ssc =================== {} {}')
kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'],
kafkaParams = {"metadata.broker.list": 'localhost:9092'})
print('contexts =================== {} {}')
lines = kstream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)
但这在我的 Jupyter 上产生的输出是
Time: 2018-02-21 15:03:25
-------------------------------------------
-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------
不是我的控制台消费者上的文本..
请帮忙,无法找出错误。
我找到了另一个解决方案。虽然将 get_text()
放入循环中的解决方案有效,但这不是正确的解决方案。您的数据在 Kafka 中发送时不是连续的。因此,Spark Streaming 不应该以这种方式获取它。
Kafka-python 库提供了 get(timeout)
功能,以便 Kafka 等待请求。
producer.send(topic,data).get(timeout=10)
由于您使用的是pykafka
,我不确定它是否有效。尽管如此,您仍然可以尝试一次并且不要将 get_text()
放入循环中。
只需将消费者中的端口从 9092 更改为 2181,因为它是 Zookeeper。从 producer 端,它必须连接到端口号为 9092 的 Kafka。从 streamer 端,它必须连接到端口号为 2181 的 Zookeeper。
我对大数据很幼稚,我正在尝试将kafka连接到spark。 这是我的制作人代码
import os
import sys
import pykafka
def get_text():
## This block generates my required text.
text_as_bytes=text.encode(text)
producer.produce(text_as_bytes)
if __name__ == "__main__":
client = pykafka.KafkaClient("localhost:9092")
print ("topics",client.topics)
producer = client.topics[b'imagetext'].get_producer()
get_text()
当我这样做时,这是在控制台消费者上打印我生成的文本 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic imagetext --from-beginning
现在我希望使用 Spark 使用此文本,这是我的 Jupyter 代码
import findspark
findspark.init()
import os
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /spark-2.1.1-bin-hadoop2.6/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,5)
print('ssc =================== {} {}')
kstream = KafkaUtils.createDirectStream(ssc, topics = ['imagetext'],
kafkaParams = {"metadata.broker.list": 'localhost:9092'})
print('contexts =================== {} {}')
lines = kstream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop(stopGraceFully = True)
但这在我的 Jupyter 上产生的输出是
Time: 2018-02-21 15:03:25
-------------------------------------------
-------------------------------------------
Time: 2018-02-21 15:03:30
-------------------------------------------
不是我的控制台消费者上的文本.. 请帮忙,无法找出错误。
我找到了另一个解决方案。虽然将 get_text()
放入循环中的解决方案有效,但这不是正确的解决方案。您的数据在 Kafka 中发送时不是连续的。因此,Spark Streaming 不应该以这种方式获取它。
Kafka-python 库提供了 get(timeout)
功能,以便 Kafka 等待请求。
producer.send(topic,data).get(timeout=10)
由于您使用的是pykafka
,我不确定它是否有效。尽管如此,您仍然可以尝试一次并且不要将 get_text()
放入循环中。
只需将消费者中的端口从 9092 更改为 2181,因为它是 Zookeeper。从 producer 端,它必须连接到端口号为 9092 的 Kafka。从 streamer 端,它必须连接到端口号为 2181 的 Zookeeper。