如果数据不符合数据框模式,如何强制 Glue DynamicFrame 失败?
how to force Glue DynamicFrame to fail if data doesn't conform to the dataframe schema?
我有一个 Glue 作业(运行 on spark),它只是将 CSV 文件转换为 Parquet。我无法控制 CSV 数据,因此我想在转换为 Parquet 期间捕获数据与 table 架构之间的任何不一致。例如,如果一列被定义为整数,如果该列中有任何字符串值,我希望作业给我一个错误!目前,DynamicFrame 通过在生成的 Parquet 文件中提供选项(字符串和整数)来解决这个问题!这对某些用例很有帮助,但我想知道是否有任何方法可以强制执行模式并让胶水作业在存在任何不一致时抛出错误。这是我的代码:
datasource0 = glueContext.create_dynamic_frame.from_catalog(databasem=mdbName, table_namem=mtable, transformation_ctx="datasource0")
df = datasource0.toDF()
df = df.coalesce(parquetFileCount)
df = convertColDataType(df, "timestamp", "timestamp", dbName, table)
applymapping1 = DynamicFrame.fromDF(df,glueContext,"finalDF")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
我有一个类似的数据类型问题,我可以通过导入另一个我知道格式正确的 df 来解决。然后我遍历了两个 df 的列并比较了它们的数据类型。在这个例子中,我在必要时重新格式化了数据类型:
df1 = inputfile
df2 = target
if df1.schema != df2.schema:
colnames = df2.schema.names
for colname in colnames:
df1DataType = get_dtype(df1, colname)
df2DataType = get_dtype(df2, colname)
if df1DataType != df2DataType:
if df1DataType == 'timestamp':
not_string = ''
df2 = df2.withColumn(colname, df2[colname].cast(TimestampType()))
elif df1DataType == 'double':
not_string = ''
df2 = df2.withColumn(colname, df2[colname].cast(DoubleType()))
elif df1DataType == 'int':
not_string = ''
df2 = df2.withColumn(colname, df2[colname].cast(IntegerType()))
else:
not_string = 'not '
print(not_string + 'updating: ' + colname + ' - from ' + df2DataType + ' to ' + df1DataType)
target = df2
你可以使用 spark native lib 而不是 glue lib 来解决这个问题
不是从目录中读取,而是使用自定义架构和 failfast 模式从相应的 s3 路径读取
schema = StructType ([StructField ('id', IntegerType(), True),
StructField ('name', StringType(), True)]
df = spark.read.option('mode', 'FAILFAST').csv(s3Path, schema=schema)
我有一个 Glue 作业(运行 on spark),它只是将 CSV 文件转换为 Parquet。我无法控制 CSV 数据,因此我想在转换为 Parquet 期间捕获数据与 table 架构之间的任何不一致。例如,如果一列被定义为整数,如果该列中有任何字符串值,我希望作业给我一个错误!目前,DynamicFrame 通过在生成的 Parquet 文件中提供选项(字符串和整数)来解决这个问题!这对某些用例很有帮助,但我想知道是否有任何方法可以强制执行模式并让胶水作业在存在任何不一致时抛出错误。这是我的代码:
datasource0 = glueContext.create_dynamic_frame.from_catalog(databasem=mdbName, table_namem=mtable, transformation_ctx="datasource0")
df = datasource0.toDF()
df = df.coalesce(parquetFileCount)
df = convertColDataType(df, "timestamp", "timestamp", dbName, table)
applymapping1 = DynamicFrame.fromDF(df,glueContext,"finalDF")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
我有一个类似的数据类型问题,我可以通过导入另一个我知道格式正确的 df 来解决。然后我遍历了两个 df 的列并比较了它们的数据类型。在这个例子中,我在必要时重新格式化了数据类型:
df1 = inputfile
df2 = target
if df1.schema != df2.schema:
colnames = df2.schema.names
for colname in colnames:
df1DataType = get_dtype(df1, colname)
df2DataType = get_dtype(df2, colname)
if df1DataType != df2DataType:
if df1DataType == 'timestamp':
not_string = ''
df2 = df2.withColumn(colname, df2[colname].cast(TimestampType()))
elif df1DataType == 'double':
not_string = ''
df2 = df2.withColumn(colname, df2[colname].cast(DoubleType()))
elif df1DataType == 'int':
not_string = ''
df2 = df2.withColumn(colname, df2[colname].cast(IntegerType()))
else:
not_string = 'not '
print(not_string + 'updating: ' + colname + ' - from ' + df2DataType + ' to ' + df1DataType)
target = df2
你可以使用 spark native lib 而不是 glue lib 来解决这个问题
不是从目录中读取,而是使用自定义架构和 failfast 模式从相应的 s3 路径读取
schema = StructType ([StructField ('id', IntegerType(), True),
StructField ('name', StringType(), True)]
df = spark.read.option('mode', 'FAILFAST').csv(s3Path, schema=schema)