从 spark udf 记录到驱动程序

log from spark udf to driver

我在 spark 中使用的数据块中有一个简单的 UDF。我不能使用 println 或 log4j 或其他东西,因为它会被输出到执行中,我需要它在驱动程序中。我有一个非常系统的日志设置

var logMessage = ""

def log(msg: String){
  logMessage += msg + "\n"
}

def writeLog(file: String){
  println("start write")
  println(logMessage)
  println("end write")
}

def warning(msg: String){
  log("*WARNING* " + msg)
}

val CleanText = (s: int) => {
  log("I am in this UDF")
  s+2
}

sqlContext.udf.register("CleanText", CleanText)

我怎样才能让它正常运行并登录到驱动程序?

你不能......除非你想发疯并制作某种通过网络发送日志的日志返回附加程序或类似的东西。

当您评估数据框时,UDF 的代码将在所有执行程序上 运行。因此,您可能有 2000 台主机 运行 并且每个主机都将登录到自己的位置;这就是 Spark 的工作原理。驱动程序不是 运行 代码,因此无法登录。

您可以使用 YARN 日志聚合从执行程序中提取所有日志以供以后分析。

您可能还可以通过一些工作写入 kafka 流或类似的创意,然后在流中连续写入日志。

Apache Spark 中与您尝试执行的操作最接近的机制是 累加器。您可以在执行程序上累积日志行并在驱动程序中访问结果:

// create a collection accumulator using the spark context:
val logLines: CollectionAccumulator[String] = sc.collectionAccumulator("log")

// log function adds a line to accumulator
def log(msg: String): Unit = logLines.add(msg)

// driver-side function can print the log using accumulator's *value*
def writeLog() {
  import scala.collection.JavaConverters._
  println("start write")
  logLines.value.asScala.foreach(println)
  println("end write")
}

val CleanText = udf((s: Int) => {
  log(s"I am in this UDF, got: $s")
  s+2
})

// use UDF in some transformation:
Seq(1, 2).toDF("a").select(CleanText($"a")).show()

writeLog()    
// prints: 
// start write
// I am in this UDF, got: 1
// I am in this UDF, got: 2
// end write

但是:这不是真正推荐的,尤其是不用于记录目的。如果您登录每条记录,这个累加器最终会 使您的驱动程序崩溃 OutOfMemoryError 或者只会让您的速度变得非常慢。

由于您使用的是 Databricks,我会检查它们支持日志聚合的选项,或者只是使用 Spark UI 查看执行程序日志。