Databricks 上的 Apache Spark 数据生成器功能不起作用

Apache Spark Data Generator Function on Databricks Not working

我正在尝试执行我的 Microsoft 提供的数据生成器功能,以测试流式传输数据到事件中心。

不幸的是,我一直收到错误消息

Processing failure: No such file or directory

当我尝试执行函数时:

%scala
DummyDataGenerator.start(15)

有人可以看一下代码并帮助破译我收到错误的原因吗:

class DummyDataGenerator:
  streamDirectory = "/FileStore/tables/flight"
None # suppress output

我不确定上面的单元格是如何被调用到函数 DummyDataGenerator

%scala

import scala.util.Random
import java.io._
import java.time._

// Notebook #2 has to set this to 8, we are setting
// it to 200 to "restore" the default behavior.
spark.conf.set("spark.sql.shuffle.partitions", 200)

// Make the username available to all other languages.
// "WARNING: use of the "current" username is unpredictable
// when multiple users are collaborating and should be replaced
// with the notebook ID instead.
val username = com.databricks.logging.AttributionContext.current.tags(com.databricks.logging.BaseTagDefinitions.TAG_USER);
spark.conf.set("com.databricks.training.username", username)

object DummyDataGenerator extends Runnable {
  var runner : Thread = null;
  val className = getClass().getName()
  val streamDirectory = s"dbfs:/tmp/$username/new-flights"
  val airlines = Array( ("American", 0.17), ("Delta", 0.12), ("Frontier", 0.14), ("Hawaiian", 0.13), ("JetBlue", 0.15), ("United", 0.11), ("Southwest", 0.18) )
  val reasons = Array("Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft")

  val rand = new Random(System.currentTimeMillis())
  var maxDuration = 3 * 60 * 1000 // default to three minutes

  def clean() {
    System.out.println("Removing old files for dummy data generator.")
    dbutils.fs.rm(streamDirectory, true)
    if (dbutils.fs.mkdirs(streamDirectory) == false) {
      throw new RuntimeException("Unable to create temp directory.")
    }
  }

  def run() {
    val date = LocalDate.now()
    val start = System.currentTimeMillis()

    while (System.currentTimeMillis() - start < maxDuration) {
      try {
        val dir = s"/dbfs/tmp/$username/new-flights"
        val tempFile = File.createTempFile("flights-", "", new File(dir)).getAbsolutePath()+".csv"
        val writer = new PrintWriter(tempFile)

        for (airline <- airlines) {
          val flightNumber = rand.nextInt(1000)+1000
          val deptTime = rand.nextInt(10)+10
          val departureTime = LocalDateTime.now().plusHours(-deptTime)
          val (name, odds) = airline
          val reason = Random.shuffle(reasons.toList).head
          val test = rand.nextDouble()

          val delay = if (test < odds)
            rand.nextInt(60)+(30*odds)
            else rand.nextInt(10)-5

          println(s"- Flight #$flightNumber by $name at $departureTime delayed $delay minutes due to $reason")
          writer.println(s""" "$flightNumber","$departureTime","$delay","$reason","$name" """.trim)
        }
        writer.close()

        // wait a couple of seconds
        //Thread.sleep(rand.nextInt(5000))

      } catch {
        case e: Exception => {
          printf("* Processing failure: %s%n", e.getMessage())
          return;
        }
      }
    }
    println("No more flights!")
  }

  def start(minutes:Int = 5) {
    maxDuration = minutes * 60 * 1000

    if (runner != null) {
      println("Stopping dummy data generator.")
      runner.interrupt();
      runner.join();
    }
    println(s"Running dummy data generator for $minutes minutes.")
    runner = new Thread(this);
    runner.run();
  }

  def stop() {
    start(0)
  }
}

DummyDataGenerator.clean()

displayHTML("Imported streaming logic...") // suppress output

由于这一行,此代码不适用于社区版:

val dir = s"/dbfs/tmp/$username/new-flights"

因为 Databricks 社区版没有 DBFS fuse(它仅在完整的 Databricks 上受支持)。有可能通过以下方式使其工作:

  1. 将该目录更改为本地目录,例如 /tmp 或类似
  2. 添加代码(在 writer.close() 之后)以列出该本地目录中的 flights-* 文件,并使用 dbutils.fs.mv 将它们移动到 streamDirectory

您应该能够在 Databricks 社区版上使用 Databricks Labs 数据生成器。我提供以下说明:

运行 社区版的 Databricks Labs 数据生成器

Databricks Labs 数据生成器是一个 Pyspark 库,因此生成数据的代码需要 Python。但是,如果您喜欢 Scala 语言,您应该能够在生成的数据上创建视图并使用它。

  1. 您可以通过使用单元格创建笔记本来在 Databricks 社区版上安装框架

%pip install git+https://github.com/databrickslabs/dbldatagen

安装后,您可以使用该库定义数据生成规范,并使用构建在其上生成 Spark 数据帧。

以下示例显示了与您尝试生成的数据集类似的批处理数据的生成。这应该放在一个单独的笔记本单元格中

注意 - 这里我们生成 1000 万条记录来说明创建更大数据集的能力。它可以用来生成比那个大得多的数据集

%python

num_rows = 10 * 1000000  # number of rows to generate
num_partitions = 8  # number of Spark dataframe partitions

delay_reasons = ["Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft"]

# will have implied column `id` for ordinal of row
flightdata_defn = (dg.DataGenerator(spark, name="flight_delay_data", rows=num_rows, partitions=num_partitions)
                 .withColumn("flightNumber", "int", minValue=1000, uniqueValues=10000, random=True)
                 .withColumn("airline", "string", minValue=1, maxValue=500,  prefix="airline", random=True, distribution="normal")
                 .withColumn("original_departure", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
                 .withColumn("delay_minutes", "int", minValue=20, maxValue=600, distribution=dg.distributions.Gamma(1.0, 2.0))
                 .withColumn("delayed_departure",  "timestamp", expr="cast(original_departure as bigint) +  (delay_minutes * 60) ", baseColumn=["original_departure", "delay_minutes"])
                 .withColumn("reason", "string", values=delay_reasons, random=True)
                )

df_flight_data = flightdata_defn.build()

display(df_flight_data)

您可以在 https://databrickslabs.github.io/dbldatagen/public_docs/using_streaming_data.html

的在线文档中找到有关如何生成流式数据的信息

您可以在数据上创建一个命名的临时视图,以便您可以使用以下两种方法之一从 SQL 或 Scala 访问它:

1:使用createOrReplaceTempView

df_flight_data.createOrReplaceTempView("delays")

2:使用构建选项。在这种情况下,传递给数据实例初始化程序的名称将是视图的名称

df_flight_data = flightdata_defn.build(withTempView=True)