在 Databricks 中调用一个函数,每个元素都是一个流
Call a function with each element a stream in Databricks
我在 Databricks 中有一个 DataFrame 流,我想对每个元素执行一个操作。在网上找到了一些特定用途的方法,比如写到控制台或者转储到内存中,但是我想添加一些业务逻辑,并将一些结果放入Redis。
更具体地说,这是非流情况下的样子:
val someDataFrame = Seq(
("key1", "value1"),
("key2", "value2"),
("key3", "value3"),
("key4", "value4")
).toDF()
def someFunction(keyValuePair: (String, String)) = {
println(keyValuePair)
}
someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))
但是如果someDataFrame
不是简单数据帧而是流数据帧(确实来自Kafka),报错信息是这样的:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
谁能帮我解决这个问题?
一些重要说明:
我已经阅读了相关文档,例如 Spark Streaming 或 Databricks Streaming 以及其他一些描述。
我知道肯定有start()
和awaitTermination
之类的东西,但我不知道确切的语法。这些描述没有帮助。
列出我尝试过的所有可能性需要好几页,所以我宁愿不提供它们。
我不是想解决显示结果的具体问题。 IE。请不要为此特定案例提供解决方案。 someFunction
看起来像这样:
val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
doSomething(keyValuePair);
}
(问题 What is the purpose of ForeachWriter in Spark Structured Streaming? 没有提供工作示例,因此没有回答我的问题。)
这是一个使用 foreachBatch 读取的示例,使用流 api.
将每个项目保存到 redis
与上一个问题相关()
// import spark and spark-redis
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.streaming._
import org.apache.spark.sql.types._
import com.redislabs.provider.redis._
// schema of csv files
val userSchema = new StructType()
.add("name", "string")
.add("age", "string")
// create a data stream reader from a dir with csv files
val csvDF = spark
.readStream
.format("csv")
.option("sep", ";")
.schema(userSchema)
.load("./data") // directory where the CSV files are
// redis
val redisConfig = new RedisConfig(new RedisEndpoint("localhost", 6379))
implicit val readWriteConfig: ReadWriteConfig = ReadWriteConfig.Default
csvDF.map(r => (r.getString(0), r.getString(0))) // converts the dataset to a Dataset[(String, String)]
.writeStream // create a data stream writer
.foreachBatch((df, _) => sc.toRedisKV(df.rdd)(redisConfig)) // save each batch to redis after converting it to a RDD
.start // start processing
在 Spark Streaming 中为每个批次调用简单的用户定义函数。
请试试这个,
它将为来自 tcp socket
的每条消息打印 'hello world'
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate()
# Create DataFrame representing the stream of input lines from connection tolocalhost:9999
lines = spark .readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
def process_row(df, epoch_id):
# # Write row to storage
print('hello world')
query = words.writeStream.foreachBatch(process_row).start()
#query = wordCounts .writeStream .outputMode("complete") .format("console") .start()
query.awaitTermination()
我在 Databricks 中有一个 DataFrame 流,我想对每个元素执行一个操作。在网上找到了一些特定用途的方法,比如写到控制台或者转储到内存中,但是我想添加一些业务逻辑,并将一些结果放入Redis。
更具体地说,这是非流情况下的样子:
val someDataFrame = Seq(
("key1", "value1"),
("key2", "value2"),
("key3", "value3"),
("key4", "value4")
).toDF()
def someFunction(keyValuePair: (String, String)) = {
println(keyValuePair)
}
someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))
但是如果someDataFrame
不是简单数据帧而是流数据帧(确实来自Kafka),报错信息是这样的:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
谁能帮我解决这个问题?
一些重要说明:
我已经阅读了相关文档,例如 Spark Streaming 或 Databricks Streaming 以及其他一些描述。
我知道肯定有
start()
和awaitTermination
之类的东西,但我不知道确切的语法。这些描述没有帮助。列出我尝试过的所有可能性需要好几页,所以我宁愿不提供它们。
我不是想解决显示结果的具体问题。 IE。请不要为此特定案例提供解决方案。
someFunction
看起来像这样:
val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
doSomething(keyValuePair);
}
(问题 What is the purpose of ForeachWriter in Spark Structured Streaming? 没有提供工作示例,因此没有回答我的问题。)
这是一个使用 foreachBatch 读取的示例,使用流 api.
将每个项目保存到 redis与上一个问题相关(
// import spark and spark-redis
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.streaming._
import org.apache.spark.sql.types._
import com.redislabs.provider.redis._
// schema of csv files
val userSchema = new StructType()
.add("name", "string")
.add("age", "string")
// create a data stream reader from a dir with csv files
val csvDF = spark
.readStream
.format("csv")
.option("sep", ";")
.schema(userSchema)
.load("./data") // directory where the CSV files are
// redis
val redisConfig = new RedisConfig(new RedisEndpoint("localhost", 6379))
implicit val readWriteConfig: ReadWriteConfig = ReadWriteConfig.Default
csvDF.map(r => (r.getString(0), r.getString(0))) // converts the dataset to a Dataset[(String, String)]
.writeStream // create a data stream writer
.foreachBatch((df, _) => sc.toRedisKV(df.rdd)(redisConfig)) // save each batch to redis after converting it to a RDD
.start // start processing
在 Spark Streaming 中为每个批次调用简单的用户定义函数。
请试试这个, 它将为来自 tcp socket
的每条消息打印 'hello world'from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate()
# Create DataFrame representing the stream of input lines from connection tolocalhost:9999
lines = spark .readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
def process_row(df, epoch_id):
# # Write row to storage
print('hello world')
query = words.writeStream.foreachBatch(process_row).start()
#query = wordCounts .writeStream .outputMode("complete") .format("console") .start()
query.awaitTermination()