在结构化流中将数据帧传递给 UDF 时出错
Error while passing dataframe to UDF in Structured Streaming
我在Spark Structured Streaming中读取来自Kafka的事件,需要一个一个地处理事件并写入redis。我为此编写了一个 UDF,但它给了我 spark 上下文错误。
conf = SparkConf()\
.setAppName(spark_app_name)\
.setMaster(spark_master_url)\
.set("spark.redis.host", "redis")\
.set("spark.redis.port", "6379")\
.set("spark.redis.auth", "abc")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
def func(element, event, timestamp):
#redis i/o
pass
schema = ArrayType(StructType(
[
StructField("element_id", StringType()),
StructField("event_name", StringType()),
StructField("event_time", StringType())
]
))
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", topic) \
.load()
#.option("includeTimestamp", value = True)\
ds = df.selectExpr(("CAST(value AS STRING)"))\
.withColumn("value", explode(from_json("value", schema)))
filter_func = udf(func, ArrayType(StringType()))
ds = ds.withColumn("column_name", filter_func(
ds['value']['element_id'],
ds['value']['event_name'],
ds['value']['event_time']
))
query = ds.writeStream \
.format("console") \
.start()
query.awaitTermination()
错误消息:_pickle.PicklingError:无法序列化对象:异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它 运行 工作人员的代码中。有关详细信息,请参阅 SPARK-5063。
感谢任何帮助。
我试图从不允许的用户定义函数中访问 spark 上下文。
在 udf 中,我试图通过使用 spark 上下文写入 spark-redis。
我在Spark Structured Streaming中读取来自Kafka的事件,需要一个一个地处理事件并写入redis。我为此编写了一个 UDF,但它给了我 spark 上下文错误。
conf = SparkConf()\
.setAppName(spark_app_name)\
.setMaster(spark_master_url)\
.set("spark.redis.host", "redis")\
.set("spark.redis.port", "6379")\
.set("spark.redis.auth", "abc")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
def func(element, event, timestamp):
#redis i/o
pass
schema = ArrayType(StructType(
[
StructField("element_id", StringType()),
StructField("event_name", StringType()),
StructField("event_time", StringType())
]
))
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", topic) \
.load()
#.option("includeTimestamp", value = True)\
ds = df.selectExpr(("CAST(value AS STRING)"))\
.withColumn("value", explode(from_json("value", schema)))
filter_func = udf(func, ArrayType(StringType()))
ds = ds.withColumn("column_name", filter_func(
ds['value']['element_id'],
ds['value']['event_name'],
ds['value']['event_time']
))
query = ds.writeStream \
.format("console") \
.start()
query.awaitTermination()
错误消息:_pickle.PicklingError:无法序列化对象:异常:您似乎正试图从广播变量、操作或转换中引用 SparkContext。 SparkContext 只能用在驱动程序上,不能用在它 运行 工作人员的代码中。有关详细信息,请参阅 SPARK-5063。
感谢任何帮助。
我试图从不允许的用户定义函数中访问 spark 上下文。 在 udf 中,我试图通过使用 spark 上下文写入 spark-redis。