hadoop reducer 不认为两个相等的自定义可写对象是相等的

hadoop reducer not considering two equal custom writable objects as equal

我正在尝试编写一个 map reduce 程序来检查共同的朋友。 我正在使用自定义可写 (FriendPair) 作为键。

给定以下输入

Tom Jerry,John
John Jerry,Sarah,Tom

它应该输出 Jerry 作为 Tom 和 John 的共同朋友

[John,Tom]    Jerry
[John,Sarah]    
[John,Jerry]
[Tom,Jerry] 

相反,map reduce 输出以下内容

[John,Tom]  
[John,Sarah]    
[John,Jerry]    
[Tom,John]  
[Tom,Jerry]

键 [John,Tom] 和 [Tom,John] 被认为是不相等的。

下面是代码

自定义可写

    public class FriendPair implements WritableComparable<FriendPair> {
        
        Text friend1;
        Text friend2;
        
        public FriendPair() {
            this.friend1 = new Text("");
            this.friend2 = new Text("");
        }
        
        public FriendPair(Text friend1, Text friend2) {
            this.friend1 = friend1;
            this.friend2 = friend2;
        }
        
        public Text getFriend1() {
            return friend1;
        }
        public void setFriend1(Text friend1) {
            this.friend1 = friend1;
        }
        public Text getFriend2() {
            return friend2;
        }
        public void setFriend2(Text friend2) {
            this.friend2 = friend2;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            friend1.write(out);
            friend2.write(out);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            friend1.readFields(in);
            friend2.readFields(in);
        }
    
        @Override
        public int compareTo(FriendPair pair2) {
            return ((friend1.compareTo(pair2.getFriend2()) == 0 && friend2.compareTo(pair2.getFriend1()) == 0)
                   || (friend1.compareTo(pair2.getFriend1()) == 0 && friend2.compareTo(pair2.getFriend2()) == 0)) ? 0 : -1;
        }
    
        @Override
        public boolean equals(Object o) {
            FriendPair pair2 = (FriendPair) o;
            return (friend1.equals(pair2.getFriend2()) && friend2.equals(pair2.getFriend1()) 
                    || friend1.equals(pair2.getFriend1()) && friend2.equals(pair2.getFriend2()));
        }
        
        @Override
        public String toString() {
            return "[" + friend1 + "," + friend2 + "]";
        }
        
        @Override
        public int hashCode() {
            return friend1.hashCode() + friend2.hashCode();
        }
    
    }

映射器

public class MutualFriendsMapper extends Mapper<LongWritable, Text, FriendPair, Text> {

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String[] items = line.split("\t");

        String name = items[0];
        String friendsList = items[1];
        String[] friends = friendsList.split(",");
        for (String friend : friends) {
            FriendPair fp = new FriendPair(new Text(name), new Text(friend));
            FriendPair fp2 = new FriendPair(new Text(friend), new Text(name));
            context.write(fp, new Text(friendsList));
        }
    }
}

减速器

public class MutualFriendsReducer extends Reducer<FriendPair, Text, FriendPair, FriendArray> {

    @Override
    public void reduce(FriendPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        
        List<String> allFriends = new ArrayList<String>();
        for(Text value : values) {
            String[] valueArray = value.toString().split(",");
            allFriends.addAll(Arrays.asList(valueArray));
        }
        List<Text> commonFriends = new ArrayList<Text>();
        Set<String> uniqueFriendSet = new HashSet<String>(allFriends);
        for(String friend : uniqueFriendSet) {
            int frequency = Collections.frequency(allFriends, friend);
            if(frequency > 1) {
                commonFriends.add(new Text(friend));
            }
        }
        
        context.write(key, new FriendArray(Text.class, commonFriends.toArray(new Text[commonFriends.size()])));
    }
}

FriendArray(输出)

public class FriendArray extends ArrayWritable {

    public FriendArray(Class<? extends Writable> valueClass, Writable[] values) {
        super(valueClass, values);
    }
    
    public FriendArray(Class<? extends Writable> valueClass) {
        super(valueClass);
    }
    
    public FriendArray() {
        super(Text.class);
    }

    @Override
    public Text[] get() {
        return (Text[]) super.get();
    }
    
    @Override
    public void write(DataOutput data) throws IOException {
        for(Text t : get()) {
            t.write(data);
        }
    }
    
    @Override
    public String toString() {
        Text[] friendArray = Arrays.copyOf(get(), get().length, Text[].class);
        String print="";
        
        for(Text f : friendArray) 
            print+=f+",";
        
        return print;
    }
}

如有任何帮助,我们将不胜感激。

在“排序”阶段,Hadoop 不对 java 对象进行操作,仅对它们的字节表示(FriendPair.write() 方法的输出)进行操作,因此它无法调用 FriendPair.equals()。因此,为了让 Hadoop 理解键 [John,Tom] 和 [Tom,John] 相等,您必须确保它们的 write 输出相同。实现此目的的一种方法是强制配对中朋友的顺序,例如按字母顺序对他们进行排序(然后两对看起来都是 [John,Tom])。