VarcharType 不匹配 Spark 数据帧

VarcharType mismatch Spark dataframe

我正在尝试更改数据框的架构。每次我有一列字符串类型时,我想将其类型更改为 VarcharType(max),其中 max 是该列中字符串的最大长度。我写了下面的代码。 (我想稍后将数据框导出到 sql 服务器,我不想在 sql 服务器中使用 nvarchar,所以我试图在 spark 端限制它)

val df = spark.sql(s"SELECT * FROM $tableName")

var l : List [StructField] = List()

val schema = df.schema
schema.fields.foreach(x => {

  if (x.dataType == StringType) {
    val dataColName = x.name
    val maxLength = df.select(dataColName).reduce((x, y) => {
      if (x.getString(0).length >= y.getString(0).length) {
        x
      } else {
        y
      }
    }).getString(0).length

    val dataType = VarcharType(maxLength)
    l = l :+ StructField(dataColName, dataType)
  } else {
    l = l :+ x
  }
})

val newSchema = StructType(l)
val newDf = spark.createDataFrame(df.rdd, newSchema)

然而,当 运行 它时,我得到这个错误。

  20/01/22 15:29:44 ERROR ApplicationMaster: User class threw exception: scala.MatchError: 
  VarcharType(9) (of class org.apache.spark.sql.types.VarcharType)
  scala.MatchError: VarcharType(9) (of class org.apache.spark.sql.types.VarcharType)

数据框列可以是 VarcharType(n) 类型吗?

来自数据库 to/from 数据帧的数据映射发生在方言 class 中。对于 MS SQL 服务器,class 是 org.apache.spark.sql.jdbc.MsSqlServerDialect。您可以继承并覆盖 getJDBCType 以影响从数据帧到 table 的数据类型映射。然后注册你的方言使其生效。

我已经为 Oracle(不是 sqlserver)完成了这个,但是也可以类似地完成。

//Change this
  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
    case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR))
    case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT))
    case _ => None
  }

您不能使用 VarcharType,因为它不是 DataType。您也无法检查实际数据的长度,因为它没有公开。您只能访问 "dt: DataType",因此如果 max 不是 acceptable.

,您可以为 NVARCHAR 设置默认大小