本地 Spark 配置
Local Spark config
我在 docker 中创建了本地 spark 环境。我打算将其用作 CICD 管道的一部分,用于在 spark 环境中执行的单元测试代码。我有两个要使用的脚本:一个将创建一组持久性 spark 数据库和 tables,另一个将读取那些 tables。尽管 tables 应该是持久的,但它们只在特定的 spark 会话中持久存在。如果我创建一个新的 spark 会话,我无法访问 tables,即使它在文件系统中是可见的。代码示例如下:
创建数据库并table
Create_script.py
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder.appName('Example').getOrCreate()
columns = ["language", "users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
spark.sql("create database if not exists schema1")
df.write.mode("ignore").saveAsTable('schema1.table1')
加载数据
load_data.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
df = spark.sql("select * from schema1.table1")
我知道有一个问题,因为我 运行 这个命令:print(spark.catalog.listDatabases()) 它只能找到数据库默认值。但是如果我导入 Create_script.py 那么它会找到 schema1 db.
如何在所有 spark 会话中持久化 tables?
/repo/test/spark-warehouse
中的这些文件只是表的数据,没有database/table/column的元信息。
如果您不启用 Hive,Spark 将使用 InMemoryCatalog
,它是临时的,仅用于测试,仅在相同的 spark 上下文中可用。此 InMemoryCatalog
不提供任何从文件系统加载 db/table 的功能。
所以有两种方法:
分栏格式
spark.write.orc()
,在你的Create_script.py
脚本中将数据写入orc/parquet格式。 orc/parquet 格式化将列信息与数据放在一起。
val df = spark.read.orc()
,然后 createOrReplaceTempView
如果你需要在 sql 中使用它。
使用嵌入 Hive
无需安装Hive,Spark可以嵌入hive,只需两步。
- 添加 spark-hive 依赖项。 (我正在使用 Java,它使用 pom.xml 来管理依赖项,我不知道如何在 pyspark 中执行)
SparkSession.builder().enableHiveSupport()
然后数据将是 /repo/test/spark-warehouse/schema1.db
,元信息将是 /repo/test/metastore_db
,其中包含 Derby 数据库的文件。您可以跨所有 spark 会话读取或写入表。
我在 docker 中创建了本地 spark 环境。我打算将其用作 CICD 管道的一部分,用于在 spark 环境中执行的单元测试代码。我有两个要使用的脚本:一个将创建一组持久性 spark 数据库和 tables,另一个将读取那些 tables。尽管 tables 应该是持久的,但它们只在特定的 spark 会话中持久存在。如果我创建一个新的 spark 会话,我无法访问 tables,即使它在文件系统中是可见的。代码示例如下:
创建数据库并table
Create_script.py
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder.appName('Example').getOrCreate()
columns = ["language", "users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
spark.sql("create database if not exists schema1")
df.write.mode("ignore").saveAsTable('schema1.table1')
加载数据
load_data.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
df = spark.sql("select * from schema1.table1")
我知道有一个问题,因为我 运行 这个命令:print(spark.catalog.listDatabases()) 它只能找到数据库默认值。但是如果我导入 Create_script.py 那么它会找到 schema1 db.
如何在所有 spark 会话中持久化 tables?
/repo/test/spark-warehouse
中的这些文件只是表的数据,没有database/table/column的元信息。
如果您不启用 Hive,Spark 将使用 InMemoryCatalog
,它是临时的,仅用于测试,仅在相同的 spark 上下文中可用。此 InMemoryCatalog
不提供任何从文件系统加载 db/table 的功能。
所以有两种方法:
分栏格式
spark.write.orc()
,在你的Create_script.py
脚本中将数据写入orc/parquet格式。 orc/parquet 格式化将列信息与数据放在一起。val df = spark.read.orc()
,然后createOrReplaceTempView
如果你需要在 sql 中使用它。
使用嵌入 Hive
无需安装Hive,Spark可以嵌入hive,只需两步。
- 添加 spark-hive 依赖项。 (我正在使用 Java,它使用 pom.xml 来管理依赖项,我不知道如何在 pyspark 中执行)
SparkSession.builder().enableHiveSupport()
然后数据将是
/repo/test/spark-warehouse/schema1.db
,元信息将是/repo/test/metastore_db
,其中包含 Derby 数据库的文件。您可以跨所有 spark 会话读取或写入表。