如何在 Spark SQL Java 中将 CSV 类型的字符串转换为数据帧?

How to transform CSV type string to dataframe in Spark SQL Java?

我使用 Spark 结构化流 API 制作 Spark Java 客户端代码。这些代码从 Kafka

中提取 CSV 类型的字符串
SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongoStream").getOrCreate();
        
Dataset<Row> df = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092"))
            .option("subscribe", "topicForMongoDB")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING)");
            
df.show();

返回结果成功。这些代码打印 CSV 类型字符串。

+--------------------+
|               value|
+--------------------+
|realtime_start,re...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|

然后我尝试将这些字符串转换为 Spark SQL 中的 Spark 数据帧。首先,下面的代码是 Java POJO class

public class EntityMongoDB implements Serializable {

    private Date date;
    private float value;
    private String id;
    private String title;
    private String state;
    private String frequency_short;
    private String units_short;
    private String seasonal_adjustment_short;
    
    private static StructType structType = DataTypes.createStructType(new StructField[] {
              
              DataTypes.createStructField("date", DataTypes.DateType, false),
              DataTypes.createStructField("value", DataTypes.FloatType, false),
              DataTypes.createStructField("id", DataTypes.StringType, false),
              DataTypes.createStructField("title", DataTypes.StringType, false),
              DataTypes.createStructField("state", DataTypes.StringType, false),
              DataTypes.createStructField("frequency_short", DataTypes.StringType, false),
              DataTypes.createStructField("units_short", DataTypes.StringType, false),
              DataTypes.createStructField("seasonal_adjustment_short", DataTypes.StringType, false)
    });
    
    public static StructType getStructType() {
        return structType;
    }
}

然后我编写代码将那些 CSV 类型的字符串转换为数据帧

Dataset<Row> dfs = df.select(from_json(col("value"), EntityMongoDB.getStructType())
        .as("entityMongoDB"))
        .selectExpr("entityMongoDB.date", "entityMongoDB.value", "entityMongoDB.id", 
                "entityMongoDB.title", "entityMongoDB.state", "entityMongoDB.frequency_short", 
                "entityMongoDB.units_short", "entityMongoDB.seasonal_adjustment_short").toDF();

dfs.show();
dfs.printSchema();

打印的架构是正确的。

 |-- date: date (nullable = true)
 |-- value: float (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- state: string (nullable = true)
 |-- frequency_short: string (nullable = true)
 |-- units_short: string (nullable = true)
 |-- seasonal_adjustment_short: string (nullable = true)

但是生成的列充满了空值:

+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|date|value|  id|title|state|frequency_short|units_short|seasonal_adjustment_short|
+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|

我认为 dataframe 的模式生成正确,但提取数据部分有一些问题。

您在 value 列中的字符串无效 JSON,因此 from_json 在这里不起作用。

对于 Spark 3+,您可以使用 from_csv,正如@mck 在评论中指出的那样:

Dataset<Row> dfs = df.select(from_csv(col("value"), EntityMongoDB.getStructType())
        .as("entityMongoDB"))
        .selectExpr("entityMongoDB.*").toDF(); 

对于 3 之前的 Spark 版本,您可以 split 逗号形式的值,然后将结果数组转换为多列:

Dataset<Row> dfs = df.select(split(col("value"), ",").as("values"))
        .select(IntStream.range(0, 7).map(i -> col("values").getItem(i)).toArray())
        .toDF("date", "value", "id", "title", "state", "frequency_short", "units_short", "seasonal_adjustment_short"); 

此外,您的值中似乎有列名,您可以过滤掉该行。