按值的列表公共元素对键进行分组

Grouping keys by value's list common elements

有这样的地图:

K1 -> [V1, V2]
K2 -> [V2, V3]
K3 -> [V3]
K4 -> [V4]

因此,我想要一组分组键,这些键至少具有值列表中的一个公共元素。解决方案应支持传递关系(G1 组):

G1 = [K1, K2, K3]
G2 = [K4]

我卡在描述的错误 here 上了。在Spark中如何实现?

我的代码如下:

public class Grouping implements Serializable {

    public void group(JavaSparkContext sc) {
        List<Mapping> list = newArrayList();
        list.add(new Mapping("K1", newArrayList("V1", "V2")));
        list.add(new Mapping("K2", newArrayList("V2", "V3")));
        list.add(new Mapping("K3", newArrayList("V3")));
        list.add(new Mapping("K4", newArrayList("V4")));

        JavaRDD<Tuple2<Mapping, String>> pairs = sc.parallelize(list).map(Mapping::toPairs).flatMap(p -> p);
        JavaPairRDD<String, Iterable<Mapping>> valuesToMappings = pairs.groupBy(Tuple2::_2).
            mapToPair(t -> new Tuple2<>(t._1, stream(t._2).map(tt -> tt._1).collect(toList())));

        JavaRDD<Group> map = valuesToMappings.map(t -> new Group(traverse(newHashSet(t._2.iterator()), valuesToMappings)));

        System.out.println(map.collect());
    }

    private Set<Mapping> traverse(Set<Mapping> mappings, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
        Set<String> values = mappings.stream().map(Mapping::getValues).flatMap(Collection::stream).collect(toSet());
        Set<Mapping> mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
        while (!mappings.equals(mappingsHavingValues)) {
            mappingsHavingValues = mappingsHavingValues(values, valuesToMappings);
        }

        return mappingsHavingValues;
    }

    private Set<Mapping> mappingsHavingValues(Set<String> values, JavaPairRDD<String, Iterable<Mapping>> valuesToMappings) {
        Set<Mapping> result = newHashSet();
        for (String value : values) {
            List<Iterable<Mapping>> lookup = valuesToMappings.lookup(value);
            result.addAll(newArrayList(lookup.get(0))); //here I get an exception
        }
        return result;
    }

    public <T> Stream<T> stream(Iterable<T> in) {
        return StreamSupport.stream(in.spliterator(), false);
    }
}

public class Mapping implements Serializable {
    private String key;
    private List<String> values;

    public Mapping(String key, List<String> values) {
        this.key = key;
        this.values = values;
    }

    public String getKey() {
        return key;
    }

    public List<String> getValues() {
        return values;
    }

    public List<Tuple2<Mapping, String>> toPairs() {
        return getValues().stream().map(v -> new Tuple2<>(this, v)).collect(toList());
    }
}

public class Group {

    private Set<Mapping> mappings;

    public Group(Set<Mapping> mappings) {
        this.mappings = mappings;
    }

    public Set<Mapping> getMappings() {
        return mappings;
    }
}

您正在寻找图中的连通分量。 org.apache.spark.graphx.lib.ConnectedComponents为此提供分布式解决方案。