将数据从胶水加载到雪花
Loading data from glue to snowflake
我正在尝试 运行 glue 上的 ETL 作业,我正在将数据从 mongodb 提取到 spark 数据帧中,然后将其加载到 snowflake 中。
这是 Spark 数据帧的示例模式
|-- login: struct (nullable = true)
| |-- login_attempts: integer (nullable = true)
| |-- last_attempt: timestamp (nullable = true)
|-- name: string (nullable = true)
|-- notifications: struct (nullable = true)
| |-- bot_review_queue: boolean (nullable = true)
| |-- bot_review_queue_web_push: boolean (nullable = true)
| |-- bot_review_queue_web_push_admin: boolean (nullable = true)
| |-- weekly_account_summary: struct (nullable = true)
| | |-- enabled: boolean (nullable = true)
| |-- weekly_summary: struct (nullable = true)
| | |-- enabled: boolean (nullable = true)
| | |-- day: integer (nullable = true)
| | |-- hour: integer (nullable = true)
| | |-- minute: integer (nullable = true)
|-- query: struct (nullable = true)
| |-- email_address: string (nullable = true)
我正在尝试将数据按原样加载到雪花中,并将列结构化为雪花中的 json 有效负载,但它会抛出以下错误
An error occurred while calling o81.collectToPython.com.mongodb.spark.exceptions.MongoTypeConversionException:Cannot cast ARRAY into a StructType
我还尝试将结构列转换为字符串并加载它,但它或多或少会抛出相同的错误
An error occurred while calling o106.save. com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType
如果能得到一些帮助,我将不胜感激。
以下用于转换和加载的代码。
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
connection_options=read_mongo_options)
user_df_cast = user_df.select(user_df.login.cast(StringType()),'name',user_df.notifications.cast(StringType()))
datasinkusers = user_df_cast.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "users").mode("append").save()
如果您在 Snowflake 中的 users
table 具有以下架构,则 不需要转换,因为 StructType
fields of a SparkSQL DataFrame will map to the VARIANT
type in Snowflake 自动:
CREATE TABLE users (
login VARIANT
,name STRING
,notifications VARIANT
,query VARIANT
)
只需执行以下操作,无需任何转换,因为 Snowflake Spark 连接器 understands the data-type 会自行转换为适当的 JSON 表示形式:
user_df = glueContext.create_dynamic_frame.from_options(
connection_type="mongodb",
connection_options=read_mongo_options
)
user_df
.toDF()
.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("dbtable", "users")
.mode("append")
.save()
如果您绝对需要将 StructType
字段存储为纯 JSON 字符串,则需要使用 to_json
SparkSQL function:
显式转换它们
from pyspark.sql.functions import to_json
user_df_cast = user_df.select(
to_json(user_df.login),
user_df.name,
to_json(user_df.notifications)
)
这会将 JSON 字符串存储为简单的 VARCHAR
类型,这不会让您利用 Snowflake 的 semi-structured data storage and querying capabilities directly without a PARSE_JSON
步骤(低效)。
考虑使用上面显示的 VARIANT
方法,这将允许您直接对字段执行查询:
SELECT
login:login_attempts
,login:last_attempt
,name
,notifications:weekly_summary.enabled
FROM users
我正在尝试 运行 glue 上的 ETL 作业,我正在将数据从 mongodb 提取到 spark 数据帧中,然后将其加载到 snowflake 中。
这是 Spark 数据帧的示例模式
|-- login: struct (nullable = true)
| |-- login_attempts: integer (nullable = true)
| |-- last_attempt: timestamp (nullable = true)
|-- name: string (nullable = true)
|-- notifications: struct (nullable = true)
| |-- bot_review_queue: boolean (nullable = true)
| |-- bot_review_queue_web_push: boolean (nullable = true)
| |-- bot_review_queue_web_push_admin: boolean (nullable = true)
| |-- weekly_account_summary: struct (nullable = true)
| | |-- enabled: boolean (nullable = true)
| |-- weekly_summary: struct (nullable = true)
| | |-- enabled: boolean (nullable = true)
| | |-- day: integer (nullable = true)
| | |-- hour: integer (nullable = true)
| | |-- minute: integer (nullable = true)
|-- query: struct (nullable = true)
| |-- email_address: string (nullable = true)
我正在尝试将数据按原样加载到雪花中,并将列结构化为雪花中的 json 有效负载,但它会抛出以下错误
An error occurred while calling o81.collectToPython.com.mongodb.spark.exceptions.MongoTypeConversionException:Cannot cast ARRAY into a StructType
我还尝试将结构列转换为字符串并加载它,但它或多或少会抛出相同的错误
An error occurred while calling o106.save. com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType
如果能得到一些帮助,我将不胜感激。
以下用于转换和加载的代码。
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
connection_options=read_mongo_options)
user_df_cast = user_df.select(user_df.login.cast(StringType()),'name',user_df.notifications.cast(StringType()))
datasinkusers = user_df_cast.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "users").mode("append").save()
如果您在 Snowflake 中的 users
table 具有以下架构,则 不需要转换,因为 StructType
fields of a SparkSQL DataFrame will map to the VARIANT
type in Snowflake 自动:
CREATE TABLE users (
login VARIANT
,name STRING
,notifications VARIANT
,query VARIANT
)
只需执行以下操作,无需任何转换,因为 Snowflake Spark 连接器 understands the data-type 会自行转换为适当的 JSON 表示形式:
user_df = glueContext.create_dynamic_frame.from_options(
connection_type="mongodb",
connection_options=read_mongo_options
)
user_df
.toDF()
.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("dbtable", "users")
.mode("append")
.save()
如果您绝对需要将 StructType
字段存储为纯 JSON 字符串,则需要使用 to_json
SparkSQL function:
from pyspark.sql.functions import to_json
user_df_cast = user_df.select(
to_json(user_df.login),
user_df.name,
to_json(user_df.notifications)
)
这会将 JSON 字符串存储为简单的 VARCHAR
类型,这不会让您利用 Snowflake 的 semi-structured data storage and querying capabilities directly without a PARSE_JSON
步骤(低效)。
考虑使用上面显示的 VARIANT
方法,这将允许您直接对字段执行查询:
SELECT
login:login_attempts
,login:last_attempt
,name
,notifications:weekly_summary.enabled
FROM users