Window 重载方法无法在 spark structured streaming-scala 中解析
Window Overload method cannot resolve in spark structured streaming-scala
以下代码在 spark scala 结构化流中抛出过载错误。
错误:
Cannot resolve overloaded method window
Code
package Stream
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.streaming.Trigger
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.sql.functions.window
object SparkRestApi {
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger("Datapipeline")
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName("StreamTest")
.config("spark.driver.memory", "2g")
.master("local[*]")
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val userSchema = new StructType()
.add("id", "string")
.add("Faulttime", "timestamp")
.add("name", "string")
.add("Parentgroup", "string")
.add("childgroup", "string")
.add("MountStyle", "string")
val JSONDF = spark
.readStream
.option("header",true)
.option("sep", ",")
.schema(userSchema) // Specify schema of the csv files
.json("D:/TEST")
val windowColumn = window($"timestamp", "10 minutes", "5 minutes")
val df2 = JSONDF.withWatermark("timestamp", "1 minutes")
.groupBy("Parentgroup","childgroup","MountStyle",window("timestamp", "5 minutes", "1 minutes"))
.agg(countDistinct("id"))
df2.
writeStream
.outputMode("Append")
.format("csv")
.option("checkpointLocation", "D:/TEST/chkdir")
.option("path", "D:/TEST/OutDir")
.option("truncate",false)
.start()
.awaitTermination()
spark.stop()
}
}
非常感谢所有宝贵的建议。
即使添加了所有库,这也会引发错误................................................ ..................................................... ..................................................... ..................................................... ..................................................... ..................................................... .....................
来自手册的一个例子:
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
尝试将您的 window 子句放在前面,我很可能会猜测。并在字段名称示例中使用 $。
val JSONDF = explodedf.withWatermark("时间戳", "1 分钟")
val aggDF = JSONDF.groupBy(functions.window(JSONDF.col("时间戳"), "30 秒", "30 秒"),JSONDF.col("jsonData.name"))
.avg("jsonData.price").alias("平均价格")
试试这个,谢谢我
以下代码在 spark scala 结构化流中抛出过载错误。
错误:
Cannot resolve overloaded method window
Code
package Stream
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.streaming.Trigger
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.sql.functions.window
object SparkRestApi {
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger("Datapipeline")
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName("StreamTest")
.config("spark.driver.memory", "2g")
.master("local[*]")
//.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val userSchema = new StructType()
.add("id", "string")
.add("Faulttime", "timestamp")
.add("name", "string")
.add("Parentgroup", "string")
.add("childgroup", "string")
.add("MountStyle", "string")
val JSONDF = spark
.readStream
.option("header",true)
.option("sep", ",")
.schema(userSchema) // Specify schema of the csv files
.json("D:/TEST")
val windowColumn = window($"timestamp", "10 minutes", "5 minutes")
val df2 = JSONDF.withWatermark("timestamp", "1 minutes")
.groupBy("Parentgroup","childgroup","MountStyle",window("timestamp", "5 minutes", "1 minutes"))
.agg(countDistinct("id"))
df2.
writeStream
.outputMode("Append")
.format("csv")
.option("checkpointLocation", "D:/TEST/chkdir")
.option("path", "D:/TEST/OutDir")
.option("truncate",false)
.start()
.awaitTermination()
spark.stop()
}
}
非常感谢所有宝贵的建议。 即使添加了所有库,这也会引发错误................................................ ..................................................... ..................................................... ..................................................... ..................................................... ..................................................... .....................
来自手册的一个例子:
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
尝试将您的 window 子句放在前面,我很可能会猜测。并在字段名称示例中使用 $。
val JSONDF = explodedf.withWatermark("时间戳", "1 分钟")
val aggDF = JSONDF.groupBy(functions.window(JSONDF.col("时间戳"), "30 秒", "30 秒"),JSONDF.col("jsonData.name")) .avg("jsonData.price").alias("平均价格")
试试这个,谢谢我