在 spark 数据集中转换数据时数据类型不匹配

Data type mismatch while transforming data in spark dataset

我使用 spark 从 csv 文件创建了镶木地板结构:

Dataset<Row> df = park.read().format("com.databricks.spark.csv").option("inferSchema", "true")
            .option("header", "true").load("sample.csv");
df.write().parquet("sample.parquet");

我正在阅读 parquet 结构,我正在尝试转换数据集中的数据:

Dataset<org.apache.spark.sql.Row> df = spark.read().parquet("sample.parquet");
df.createOrReplaceTempView("tmpview");
Dataset<Row> namesDF = spark.sql("SELECT *, md5(station_id) as hashkey FROM tmpview");

不幸的是,我收到数据类型不匹配错误。我必须明确分配数据类型吗?

17/04/12 09:21:52 INFO SparkSqlParser: Parsing command: SELECT *, md5(station_id) as hashkey FROM tmpview Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'md5(tmpview.station_id)' due to data type mismatch: argument 1 requires binary type, however, 'tmpview.station_id' is of int type.; line 1 pos 10; 'Project [station_id#0, bikes_available#1, docks_available#2, time#3, md5(station_id#0) AS hashkey#16] +- SubqueryAlias tmpview, tmpview +- Relation[station_id#0,bikes_available#1,docks_available#2,time#3] parquet

是的,根据 Spark documentationmd5 函数仅适用于 binary (text/string) 列,因此您需要将 station_id 转换为 string 在应用 md5 之前。在 Spark SQL 中,您可以将 md5cast 链接在一起,例如:

Dataset<Row> namesDF = spark.sql("SELECT *, md5(cast(station_id as string)) as hashkey FROM tmpview");

或者您可以在数据框中创建一个新列并在其上应用 md5,例如:

val newDf = df.withColumn("station_id_str", df.col("station_id").cast(StringType))
newDf.createOrReplaceTempView("tmpview");
Dataset<Row> namesDF = spark.sql("SELECT *, md5(station_id_str) as hashkey FROM tmpview");

如果您在 Databricks(Azure Databricks 笔记本)上遇到此错误 3.x 集群上的 运行 以下代码段可能适合您。

md5(cast(concat_ws('some string' + 'some string') + """)as BINARY))