如何在 Databricks 流中 运行 Scala 中的 if else 语句
How to run an if else statement in Scala in Databricks streaming
我是 Scala 和 Databricks 流媒体的新手。我正在将流式事件读入数据帧,我想使用 if-else 语句根据数据帧是否为空来触发不同的笔记本。下面的简单代码(及其变体)
if(finalDF.isEmpty){
print("0")
}
else{
print("1")
}
持续导致以下错误
AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs
如何将 writeStream.start() 合并到上面的代码中?或者,我如何评估数据框内容并基于此采取一个或另一个操作,因为数据框是由流式事件填充到其中的?
流式 DF 不能为空或设计为非空 - 流是无限的,如果您现在没有数据,那么您可以在下一秒获得新的东西。所以你的代码将不起作用。
您可以使用 foreachBatch 处理数据的“当前”快照,您可以像处理“正常”的非流式数据帧一样处理这些数据,但是您 可能无法 从里面触发笔记本,所以两种情况的代码应该在同一个函数中,而不是在不同的笔记本中。
我测试了这段代码,它可以作为一种引入 if-else 并根据事件内容决定操作的方法。
df.writeStream.foreachBatch((df: org.apache.spark.sql.DataFrame, batchID: Long) => myfunc(df)).start()
def myfunc(df: org.apache.spark.sql.DataFrame){
val test1= df.filter($"col" === "test1")
val test2= df.filter($"col" === "test2")
if(test1.count()>0){
dbutils.notebook.run("some_notebook", 60)
}
if(test2.count()>0){
dbutils.notebook.run("another_notebook", 60)
}
}
我是 Scala 和 Databricks 流媒体的新手。我正在将流式事件读入数据帧,我想使用 if-else 语句根据数据帧是否为空来触发不同的笔记本。下面的简单代码(及其变体)
if(finalDF.isEmpty){
print("0")
}
else{
print("1")
}
持续导致以下错误
AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
eventhubs
如何将 writeStream.start() 合并到上面的代码中?或者,我如何评估数据框内容并基于此采取一个或另一个操作,因为数据框是由流式事件填充到其中的?
流式 DF 不能为空或设计为非空 - 流是无限的,如果您现在没有数据,那么您可以在下一秒获得新的东西。所以你的代码将不起作用。
您可以使用 foreachBatch 处理数据的“当前”快照,您可以像处理“正常”的非流式数据帧一样处理这些数据,但是您 可能无法 从里面触发笔记本,所以两种情况的代码应该在同一个函数中,而不是在不同的笔记本中。
我测试了这段代码,它可以作为一种引入 if-else 并根据事件内容决定操作的方法。
df.writeStream.foreachBatch((df: org.apache.spark.sql.DataFrame, batchID: Long) => myfunc(df)).start()
def myfunc(df: org.apache.spark.sql.DataFrame){
val test1= df.filter($"col" === "test1")
val test2= df.filter($"col" === "test2")
if(test1.count()>0){
dbutils.notebook.run("some_notebook", 60)
}
if(test2.count()>0){
dbutils.notebook.run("another_notebook", 60)
}
}