本地 Spark 配置

Local Spark config

我在 docker 中创建了本地 spark 环境。我打算将其用作 CICD 管道的一部分,用于在 spark 环境中执行的单元测试代码。我有两个要使用的脚本:一个将创建一组持久性 spark 数据库和 tables,另一个将读取那些 tables。尽管 tables 应该是持久的,但它们只在特定的 spark 会话中持久存在。如果我创建一个新的 spark 会话,我无法访问 tables,即使它在文件系统中是可见的。代码示例如下:

创建数据库并table

Create_script.py

from pyspark.sql import SparkSession
def main():
    spark = SparkSession.builder.appName('Example').getOrCreate()
    columns = ["language", "users_count"]
    data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
    rdd = spark.sparkContext.parallelize(data)
    df = rdd.toDF(columns)
    spark.sql("create database if not exists schema1")
    df.write.mode("ignore").saveAsTable('schema1.table1')

加载数据

load_data.py

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
df = spark.sql("select * from schema1.table1")

我知道有一个问题,因为我 运行 这个命令:print(spark.catalog.listDatabases()) 它只能找到数据库默认值。但是如果我导入 Create_script.py 那么它会找到 schema1 db.

如何在所有 spark 会话中持久化 tables?

/repo/test/spark-warehouse中的这些文件只是表的数据,没有database/table/column的元信息。

如果您不启用 Hive,Spark 将使用 InMemoryCatalog,它是临时的,仅用于测试,仅在相同的 spark 上下文中可用。此 InMemoryCatalog 不提供任何从文件系统加载 db/table 的功能。

所以有两种方法:

  1. 分栏格式

    • spark.write.orc(),在你的Create_script.py脚本中将数据写入orc/parquet格式。 orc/parquet 格式化将列信息与数据放在一起。
    • val df = spark.read.orc(),然后 createOrReplaceTempView 如果你需要在 sql 中使用它。
  2. 使用嵌入 Hive

    无需安装Hive,Spark可以嵌入hive,只需两步。

    • 添加 spark-hive 依赖项。 (我正在使用 Java,它使用 pom.xml 来管理依赖项,我不知道如何在 pyspark 中执行)
    • SparkSession.builder().enableHiveSupport()

    然后数据将是 /repo/test/spark-warehouse/schema1.db,元信息将是 /repo/test/metastore_db,其中包含 Derby 数据库的文件。您可以跨所有 spark 会话读取或写入表。