问:数据集<Row> 到空数据集 Spark Java 的平面图

Q : Dataset<Row> flatmap to empty dataset Spark Java

我想删除数据集的重复值

例如:

+----------+---------------+--------------------+--------------------+---------+----+-------------+
|     e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
|    Tryout|     2020-04-01|      item_guid_list|            a^a^a^b |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|         c^c^d^e^f^f|       FR| iOS|   2020-04-01|

+----------+---------------+--------------------+--------------------+---------+----+-------------+
|     e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
|    Tryout|     2020-04-01|      item_guid_list|                  a |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|      item_guid_list|                  b |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  c |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  d |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  e |       FR| iOS|   2020-04-01|
|    Tryout|     2020-04-01|            sku_list|                  f |       FR| iOS|   2020-04-01|

但是

当我使用平面图时

结果是

++
||
++
||
||
||
||
||

我的密码是

            StructType structType = new StructType();
            structType.add("e_key", DataTypes.StringType);
            structType.add("f_timestamp_day", DataTypes.StringType);
            structType.add("key", DataTypes.StringType);
            structType.add("value", DataTypes.StringType);
            structType.add("f_country", DataTypes.StringType);
            structType.add("f_os", DataTypes.StringType);
            structType.add("received_date", DataTypes.StringType);


            Dataset<Row> drop_duplicate_feature = 
            explode_events.flatMap(
                (FlatMapFunction<Row, Row>)row->{
                    List<Row> list = new ArrayList<Row>();
                    String value = row.getString(3);
                    String[] array_of_value = value.split("\^");
                    array_of_value = new HashSet<String>(Arrays.asList(array_of_value)).toArray(new String[0]);
                    for(int index = 0; index < array_of_value.length; index++){
                        list.add(
                            RowFactory.create(row.get(0),row.get(1),row.get(2),array_of_value[index],row.get(4),row.get(5),row.get(6))
                        );
                    }
                    return list.iterator();
                }
                , RowEncoder.apply(structType)
            );

我使用平面图生成不同的行并将其添加到列表中

为什么 RowEncoder.apply() 不起作用?

您可以使用 Spark SQL 和 org.apache.spark.sql.functions 获得相同的结果:

explode_events.select(
    $"e_key",
    $"f_timestamp_day",
    $"key",
    explode(split($"value","\^")),
    $"f_country",
    $"f_os",
    $"received_date"
).show()

试试这个-

1。加载提供的测试数据

 String data = "   e_key|f_timestamp_day|                 key|               value|f_country|f_os|received_date\n" +
                "  Tryout|     2020-04-01|      item_guid_list|            a^a^a^b |       FR| iOS|   2020-04-01\n" +
                "  Tryout|     2020-04-01|            sku_list|         c^c^d^e^f^f|       FR| iOS|   2020-04-01";

        List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
                .map(s -> Arrays.stream(s.split("\|"))
                        .map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
                        .collect(Collectors.joining(","))
                )
                .collect(Collectors.toList());

        Dataset<Row> dataset = spark.read()
                .option("header", true)
                .option("inferSchema", true)
                .option("sep", ",")
                .option("nullValue", "null")
                .csv(spark.createDataset(list1, Encoders.STRING()));
        dataset.show(false);
        dataset.printSchema();
        /**
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         * |e_key |f_timestamp_day    |key           |value      |f_country|f_os|received_date      |
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         * |Tryout|2020-04-01 00:00:00|item_guid_list|a^a^a^b    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |c^c^d^e^f^f|FR       |iOS |2020-04-01 00:00:00|
         * +------+-------------------+--------------+-----------+---------+----+-------------------+
         *
         * root
         *  |-- e_key: string (nullable = true)
         *  |-- f_timestamp_day: timestamp (nullable = true)
         *  |-- key: string (nullable = true)
         *  |-- value: string (nullable = true)
         *  |-- f_country: string (nullable = true)
         *  |-- f_os: string (nullable = true)
         *  |-- received_date: timestamp (nullable = true)
         */

从数组中删除 distinct 并分解它

        dataset.withColumn("value", explode(array_distinct(split(col("value"), "\^"))))
                .show(false);
        /**
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         * |e_key |f_timestamp_day    |key           |value|f_country|f_os|received_date      |
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         * |Tryout|2020-04-01 00:00:00|item_guid_list|a    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|item_guid_list|b    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |c    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |d    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |e    |FR       |iOS |2020-04-01 00:00:00|
         * |Tryout|2020-04-01 00:00:00|sku_list      |f    |FR       |iOS |2020-04-01 00:00:00|
         * +------+-------------------+--------------+-----+---------+----+-------------------+
         */

使用 split 函数拆分值 col 并使用 array_disticnct 和 explode 函数来实现结果。

from pyspark.sql.functions import *
#create df1
df1= df.withColumn("value",explode(array_distinct((split("VALUES","\^")))))