基于索引 id 的 ArrayList 过滤 JavaRDD

Filter JavaRDD based on a ArrayList of index id's

我有数据集 df,其内容的索引为 accountid,我还有带有 accountid 的数组列表。如何过滤或映射数据集以创建仅包含基于数组列表中的帐户 ID 的内容的新数据集。

我正在使用 Java 8

List<String> accountIdList= new ArrayList<String>();
accountIdList.add("1001");
accountIdList.add("1002");
accountIdList.add("1003");
accountIdList.add("1004");
Dataset<Row> filteredRows=  df.filter(p-> df.col("accountId").equals(accountIdList));

我正在尝试将列表本身传递给比较运算符,您认为这是正确的方法吗

Java 语法是

如果您正在寻找 java 语法

Dataset<Row> filteredRows=  df.where(df.col("accountId").isin(accountIdList.toArray()));

使用Column.isin方法:

import scala.collection.JavaConversions;
import static org.apache.spark.sql.functions.*;

Dataset<Row> filteredRows = df.where(col("accountId").isin(
  JavaConversions.asScalaIterator(accountIdList.iterator()).toSeq()
));

这是 Java 中的工作代码。希望对你有帮助。

这是我的示例文件内容(输入):-

1001

1008

1005

1009

1010

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;

public class DatasetFilter {

    private static List<String> sampleList = new ArrayList<String>();

    public static void main(String[] args)
    {
        sampleList.add("1001");
        sampleList.add("1002");
        sampleList.add("1003");
        sampleList.add("1004");
        sampleList.add("1005");

        SparkSession sparkSession = SparkSession.builder()
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.sql.warehouse.dir", "file:///C:/Users/user/workspace/Validation/spark-warehouse")
                .master("local[*]").getOrCreate();

        //Read the source-file.
        Dataset<String> src = sparkSession.read().textFile("C:\Users\user\Desktop\dataSetFilterTest.txt");
        src.show(10);

        //Apply filter
        Dataset<String> filteredSource = src.filter(new FilterFunction<String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public boolean call(String value) throws Exception {
                System.out.println("***************************************");
                boolean status = false;
                Iterator<String> iterator = sampleList.iterator();
                while (iterator.hasNext()) {
                    String val = iterator.next();
                    System.out.println("Val is :: " + val + " Value is :: " + value);
                    if (value.equalsIgnoreCase(val)) {
                        status = true;
                        break;
                    }
                }
                return status;
            }
        });

        filteredSource.show();

        System.out.println("Completed the job :)");
    }

}

输出:-