有没有办法在通过 Glue 插入 Redshift 时简单地截断列?
Is there a way to simply truncate columns while inserting into Redshift via Glue?
我有一个大于 varchar(max) 数据类型的列,据我所知,这是 AWS Glue 使用的最大数据类型,当我尝试加载我的数据时出现错误 "String length exceeds DDL length"因为它的表。我并没有试图截断该列,因为它不是那么重要并且无法弄清楚如何在 Glue 中做到这一点。我知道如果我在 EC2 实例中使用 psql 连接到我的数据库,我可以使用 TRUNCATECOLUMNS 作为复制命令的标记,并且实际上可以通过这种方式成功加载我的表。但是,我的老板坚持要我使用 Glue 来完成这项工作,因此我正在寻找一种使用 Glue 脚本截断列的方法。我查看了很多文档,但找不到类似的东西。谢谢。
这是一些适用于可能遇到此问题并需要完整参考的其他人的工作代码。请注意,varchar(65535)
是 Redshift 中一列的最大字符数:
val truncColUdf = udf((str: String) => if (str.length > 29999) str.substring(0, 29999) else str)
val datasource30 = glueContext.getCatalogSource(database = "database", tableName = "entry", redshiftTmpDir = "", transformationContext = "datasource30").getDynamicFrame()
val revDF30 = datasource30.toDF()
.withColumn("message", truncColUdf(col("message")))
val truncDynamicFrame30 = DynamicFrame(revDF30, glueContext)
val applymapping30 = truncDynamicFrame30.applyMapping(mappings = Seq(("id", "bigint", "id", "bigint"), ("message", "string", "message", "varchar(65535)"), ("state", "string", "state", "varchar(256)"), ("created_at", "timestamp", "created_at", "timestamp"), ("depth", "int", "depth", "int")), caseSensitive = false, transformationContext = "applymapping30")
val resolvechoice30 = applymapping30.resolveChoice(choiceOption = Some(ChoiceOption("make_cols")), transformationContext = "resolvechoice30")
val dropnullfields30 = resolvechoice30.dropNulls(transformationContext = "dropnullfields30")
val datasink30 = glueContext.getJDBCSink(catalogConnection = "databaseConnection", options = JsonOptions("""{"dbtable": "entry", "database": "database"}"""), redshiftTmpDir = args("TempDir"), transformationContext = "datasink30").writeDynamicFrame(dropnullfields30)
这是正在读取的示例数据行:
01,"<p>Here is the message where the quotations are in case of commas within the message, like so.</p>",active,2017-08-27 23:38:40,1
将 DynamicFrame 转换为 spark 的 DataFrame,然后使用用户定义的函数截断列值 (Scala):
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.sql.functions._
val truncColUdf = udf((str: String) => if (str.length > 20) str.substring(0, 20) else str)
val truncDataFrame = dynamicFrame.toDF()
.select("text_long")
.withColumn("text_short", truncColUdf(col("text_long")))
.withColumn("text_short_length", length(col("text_short")))
truncDataFrame.show(5, false)
val truncDynamicFrame = DynamicFrame(truncDataFrame, glueContext)
...
//write to sink
输出:
+-----------------------+--------------------+-----------------+
|text_long |text_short |text_short_length|
+-----------------------+--------------------+-----------------+
|I'd rather not answer |I'd rather not answe|20 |
|Agree |Agree |5 |
|Custom Answer Favorable|Custom Answer Favora|20 |
|Agree |Agree |5 |
|Sometimes |Sometimes |9 |
+-----------------------+--------------------+-----------------+
您可以在 DynamicFrameWriter 的“extracopyoptions”参数中传递“TRUNCATECOLUMNS”:https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/
我有一个大于 varchar(max) 数据类型的列,据我所知,这是 AWS Glue 使用的最大数据类型,当我尝试加载我的数据时出现错误 "String length exceeds DDL length"因为它的表。我并没有试图截断该列,因为它不是那么重要并且无法弄清楚如何在 Glue 中做到这一点。我知道如果我在 EC2 实例中使用 psql 连接到我的数据库,我可以使用 TRUNCATECOLUMNS 作为复制命令的标记,并且实际上可以通过这种方式成功加载我的表。但是,我的老板坚持要我使用 Glue 来完成这项工作,因此我正在寻找一种使用 Glue 脚本截断列的方法。我查看了很多文档,但找不到类似的东西。谢谢。
这是一些适用于可能遇到此问题并需要完整参考的其他人的工作代码。请注意,varchar(65535)
是 Redshift 中一列的最大字符数:
val truncColUdf = udf((str: String) => if (str.length > 29999) str.substring(0, 29999) else str)
val datasource30 = glueContext.getCatalogSource(database = "database", tableName = "entry", redshiftTmpDir = "", transformationContext = "datasource30").getDynamicFrame()
val revDF30 = datasource30.toDF()
.withColumn("message", truncColUdf(col("message")))
val truncDynamicFrame30 = DynamicFrame(revDF30, glueContext)
val applymapping30 = truncDynamicFrame30.applyMapping(mappings = Seq(("id", "bigint", "id", "bigint"), ("message", "string", "message", "varchar(65535)"), ("state", "string", "state", "varchar(256)"), ("created_at", "timestamp", "created_at", "timestamp"), ("depth", "int", "depth", "int")), caseSensitive = false, transformationContext = "applymapping30")
val resolvechoice30 = applymapping30.resolveChoice(choiceOption = Some(ChoiceOption("make_cols")), transformationContext = "resolvechoice30")
val dropnullfields30 = resolvechoice30.dropNulls(transformationContext = "dropnullfields30")
val datasink30 = glueContext.getJDBCSink(catalogConnection = "databaseConnection", options = JsonOptions("""{"dbtable": "entry", "database": "database"}"""), redshiftTmpDir = args("TempDir"), transformationContext = "datasink30").writeDynamicFrame(dropnullfields30)
这是正在读取的示例数据行:
01,"<p>Here is the message where the quotations are in case of commas within the message, like so.</p>",active,2017-08-27 23:38:40,1
将 DynamicFrame 转换为 spark 的 DataFrame,然后使用用户定义的函数截断列值 (Scala):
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.sql.functions._
val truncColUdf = udf((str: String) => if (str.length > 20) str.substring(0, 20) else str)
val truncDataFrame = dynamicFrame.toDF()
.select("text_long")
.withColumn("text_short", truncColUdf(col("text_long")))
.withColumn("text_short_length", length(col("text_short")))
truncDataFrame.show(5, false)
val truncDynamicFrame = DynamicFrame(truncDataFrame, glueContext)
...
//write to sink
输出:
+-----------------------+--------------------+-----------------+
|text_long |text_short |text_short_length|
+-----------------------+--------------------+-----------------+
|I'd rather not answer |I'd rather not answe|20 |
|Agree |Agree |5 |
|Custom Answer Favorable|Custom Answer Favora|20 |
|Agree |Agree |5 |
|Sometimes |Sometimes |9 |
+-----------------------+--------------------+-----------------+
您可以在 DynamicFrameWriter 的“extracopyoptions”参数中传递“TRUNCATECOLUMNS”:https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/