Spark 2.2.0:如何从列表列的数据集中删除特定的重复项

Spark 2.2.0: How to remove Specific Duplicates from a Dataset of a list column

大家早上好,祝大家有个愉快的一天。我有一个如下所示的数据集

+-----+------------+
|Text2|Set         |
+-----+------------+
|seven|[3]         |
|one  |[1, 5, 2, 3]|
|six  |[5]         |
|five |[5, 2, 4]   |
+-----+------------+

我想做的是从 Colum 集中删除特定的重复项。例如,假设我想一起删除数字 3 和 4。

想象一下上面的输出。我们可以看到 Set

列的每一行都删除了 3,4
+-----+------------+
|Text2|Set         |
+-----+------------+
|one  |[1, 5, 2]   |
|six  |[5]         |
|five |[5, 2]      |
+-----+------------+

使用数据集 api 最有效的方法是什么?因为假设我需要对大数据进行此操作

我的想法是先映射,然后执行 reduce 作为实现它的最有效方法,但我不确定我现在所说的是不是一个愚蠢的解决方案。

这是我的 Java 代码示例

 List<Row> data = Arrays.asList(
                RowFactory.create("seven", Arrays.asList(3)),
                RowFactory.create("one", Arrays.asList(1, 5, 2, 3)),
                RowFactory.create("six", Arrays.asList(5)),
                RowFactory.create("five", Arrays.asList(5, 2, 4))
        );

        StructType schema = new StructType(new StructField[]{
                new StructField("Text2", DataTypes.StringType, false, Metadata.empty()),
                new StructField("Set", DataTypes.createArrayType(DataTypes.IntegerType), false, Metadata.empty())
        });

        Dataset<Row> df = spark.createDataFrame(data, schema);
        df.show(false);

如果有人能根据我的问题给我一个解决方案,我将不胜感激

假设 df 是您的初始数据框,代码按照您期望的方式进行过滤。

Dataset<Row> df = sparkSession.createDataFrame(data, schema);

    UDF3<WrappedArray<Integer>,Integer,Integer,List<Integer>> filterFunction =  (WrappedArray<Integer> input, Integer filtVal1,Integer filtVal2) -> {
        List<Integer> newLst= new ArrayList<>(JavaConversions.asJavaList(input));
        newLst.removeIf(x -> x==filtVal1 || x==filtVal2);
        return newLst;
    };

    sparkSession.udf().register("filterFunction", filterFunction, DataTypes.createArrayType(DataTypes.IntegerType));

    Dataset<Row> filteredDf= df.withColumn("Set_temp", functions.callUDF("filterFunction", df.col("Set"),functions.lit(3),functions.lit(4))).drop("Set").withColumnRenamed("Set_temp", "Set").filter("size(Set_temp)>0");

    filteredDf.show();

  +-----+---------+
  |Text2|      Set|
   +-----+---------+
  |  one|[1, 5, 2]|
  |  six|      [5]|
  | five|   [5, 2]|
  +-----+---------+