用Spark快速生成parquet数据文件用于测试的方法是什么Hive/Presto/Drill/etc?

What is a fast way to generate parquet data files with Spark for testing Hive/Presto/Drill/etc?

我经常发现自己需要生成 parquet 文件来测试 Hive、Presto、Drill 等基础设施组件

网上的镶木地板样本数据集出奇地少,我在这里遇到的仅有的一个 https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet 是信用卡号、收入等的模拟数据。我不喜欢把它放在我的数据湖以防有人认为它是真实的。

当您需要测试时,生成 parquet 数据文件的最佳方式是什么?我通常会产生火花并最终使用它;我会 post 我的解决方案作为答案,因为这里似乎不存在。但我很好奇人们使用 spark 或其他技术有什么更好的解决方案。

我对这个问题的正常解决方案是使用 Spark 和 Scala 中的可变列表来构建一些简单的示例数据。我会根据需要引入日期和其他各种数据类型,但这是我通常采用的方式。

基本上,我只是将可变列表变成一个数据框并合并到输出中我需要的目标文件的数量,然后保存到 parquet。

//Create a mutable list buffer based on a loop.
import scala.collection.mutable.ListBuffer
var lb = ListBuffer[(Int, Int, String)]()
for (i <- 1 to 5000) {
  lb += ((i, i*i, "Number is " + i + "."))
}

//Convert it to a data frame.
import spark.implicits._
val df = lb.toDF("value", "square", "description")

df.coalesce(5).write.mode(SaveMode.Overwrite).parquet("<your-hdfs-path>/name.parquet")

不过,如果能有一种不产生火花的方法就太好了。另外,如果我想要更大的数据集,我必须修改它以避免在驱动程序中生成所有记录;这更适用于小到 mid-size 的数据集。

我猜主要目标是生成数据,而不是将其写入某种格式。

让我们从一个非常简单的例子开始。

要生成任意DataFrame,首先需要的是它的架构。 此后我将使用一个非常简单的模式对一些用户事务进行建模。

val transactionsSchema: StructType = new StructType()
    .add("user_id", IntegerType)
    .add("ts", TimestampType)
    .add("amount", DoubleType)

com.holdenkarau.spark.testing 有一个对象 DataframeGenerator。 该对象有两种方法生成数据帧:.arbitraryDataFrame(完全随机结果)和.arbitraryDataFrameWithCustomFields(您可以在其中为给定属性设置自定义生成器,其他属性将自动生成)。

DataFrame 生成器获取 sqlContext 和模式作为输入。

val transactionsDFGenerator: Arbitrary[DataFrame] =
    DataframeGenerator.arbitraryDataFrame(spark.sqlContext, transactionsSchema)

以及获取随机DataFrame的函数。

def generateTransactionsDF(): DataFrame =
    transactionsDFGenerator
      .arbitrary(Gen.Parameters.default, Seed(100), 10)
      .get

这是生成的数据集:

+-----------+------------------------------+-----------------------+
|user_id    |ts                            |amount                 |
+-----------+------------------------------+-----------------------+
|-375726664 |1970-01-01 03:00:00.001       |-2.9945060451319086E271|
|0          |1970-01-01 02:59:59.999       |-4.774320614638788E-237|
|1          |215666-12-06 17:54:3333.972832|8.78381185978856E96    |
|-2147483648|1970-01-01 03:00:00.001       |1.6036825986813454E58  |
|568605722  |219978-07-03 23:47:3737.050592|6.632020739877623E-165 |
|-989197852 |1970-01-01 03:00:00.001       |8.92083260179676E233   |
|-2147483648|264209-01-26 00:54:2525.980256|-7.986228470636884E-216|
|0          |145365-06-27 03:25:5656.721168|-5.607570396263688E-45 |
|-1         |1970-01-01 02:59:59.999       |2.4723152616146036E-227|
|-2147483648|4961-05-03 05:19:42.439408    |1.9109576041021605E83  |
+-----------+------------------------------+-----------------------+

完整代码:

import co.featr.sia.utils.spark.getSparkSession
import com.holdenkarau.spark.testing.DataframeGenerator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalacheck.rng.Seed
import org.scalacheck.{Arbitrary, Gen}

object GenerateData {
  Logger.getLogger("org").setLevel(Level.OFF)
  def main(args: Array[String]): Unit = {
    val spark = spark.builder.master("local").getOrCreate()
    val runner = new GenerateData(spark)
    runner.run()
  }
}

class GenerateData(spark: SparkSession) {

  def run(): Unit = {
    val df: DataFrame = generateTransactionsDF()
    df.show(10, false)
  }

  def generateTransactionsDF(): DataFrame =
    transactionsDFGenerator
      .arbitrary(Gen.Parameters.default, Seed(100))
      .get

  lazy val transactionsDFGenerator: Arbitrary[DataFrame] =
    DataframeGenerator.arbitraryDataFrame(spark.sqlContext, transactionsSchema, 10)

  lazy val transactionsSchema: StructType = new StructType()
    .add("user_id", IntegerType)
    .add("ts", TimestampType)
    .add("amount", DoubleType)
}

Python 的 pyarrow 库允许您仅用几行代码从 pandas DataFrame 编写 parquet。

https://arrow.apache.org/docs/python/parquet.html

farsante 库可让您生成伪造的 PySpark / Pandas 数据集,这些数据集可以轻松地以 Parquet 文件格式写出。这是一个例子:

import farsante
from mimesis import Person
from mimesis import Address
from mimesis import Datetime

person = Person()
address = Address()
datetime = Datetime()
df = farsante.pyspark_df([person.full_name, person.email, address.city, address.state, datetime.datetime], 3)
df.write.mode('overwrite').parquet('./tmp/spark_fake_data')

简单地使用 Pandas 写出示例 Parquet 文件会更容易。像这样的任务不需要 Spark。

df = farsante.pandas_df([person.full_name, person.email, address.city, address.state, datetime.datetime], 3)
df.to_parquet('./tmp/fake_data.parquet', index=False)

好像有一个Scala faker library but it doesn't look nearly as mature as the mimesis library。 Go 有很好的 faker 和 Parquet 库,所以这是生成假数据的另一种选择。