Spark 3.x 在 Python 中与 Kafka 集成
Spark 3.x Integration with Kafka in Python
带有 spark-streaming 的 Kafka 抛出错误:
from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka
我已经设置了一个 kafka broker 和一个 spark 工作环境,其中有一个 master 和一个 worker。
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__=="__main__":
sc = SparkContext(appName="SparkStreamAISfromKAFKA")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,1)
kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
lines = kvs.map(lambda x: x[1])
lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()
我假设错误是缺少与 kafka ans 相关的内容,特别是与版本相关的内容。有人可以帮忙吗?
spark-版本:版本 3.0.0-preview2
我执行:
/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars spark-streaming-kafka-0-10_2.11 spark_streamer.py spark://mysparkip:7077
根据Spark Streaming + Kafka Integration Guide:
"Kafka 0.8 support is deprecated as of Spark 2.3.0."
此外,下面的屏幕截图显示 Python 不支持 Kafka 0.10(及更高版本)。
在您的情况下,您必须使用 Spark 2.4 才能获取代码 运行。
PySpark 支持结构化流
如果您计划使用最新版本的 Spark(例如 3.x)并且仍想在 Python 中将 Spark 与 Kafka 集成,您可以使用结构化流。您将在 Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher):
中找到有关如何使用 Python API 的详细说明
从 Kafka 读取数据
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
正在向 Kafka 写入数据
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
带有 spark-streaming 的 Kafka 抛出错误:
from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka
我已经设置了一个 kafka broker 和一个 spark 工作环境,其中有一个 master 和一个 worker。
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__=="__main__":
sc = SparkContext(appName="SparkStreamAISfromKAFKA")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,1)
kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
lines = kvs.map(lambda x: x[1])
lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
ssc.start()
ssc.awaitTermination()
我假设错误是缺少与 kafka ans 相关的内容,特别是与版本相关的内容。有人可以帮忙吗?
spark-版本:版本 3.0.0-preview2
我执行:
/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars spark-streaming-kafka-0-10_2.11 spark_streamer.py spark://mysparkip:7077
根据Spark Streaming + Kafka Integration Guide:
"Kafka 0.8 support is deprecated as of Spark 2.3.0."
此外,下面的屏幕截图显示 Python 不支持 Kafka 0.10(及更高版本)。
在您的情况下,您必须使用 Spark 2.4 才能获取代码 运行。
PySpark 支持结构化流
如果您计划使用最新版本的 Spark(例如 3.x)并且仍想在 Python 中将 Spark 与 Kafka 集成,您可以使用结构化流。您将在 Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher):
中找到有关如何使用 Python API 的详细说明从 Kafka 读取数据
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
正在向 Kafka 写入数据
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()