如何对 Reducer 输出中的逗号分隔键进行排序?
How to sort comma separated keys in Reducer ouput?
我是 运行 使用 MapReduce 的 RFM 分析程序。 OutputKeyClass 是 Text.class,我将逗号分隔的 R(新近度)、F(频率)、M(货币)作为 Reducer 的键,其中 R=BigInteger、F=Binteger、M=BigDecimal,值也是代表 Customer_ID 的文本。我知道 Hadoop 根据键对输出进行排序,但我的最终结果有点奇怪。我希望输出键首先按 R 排序,然后是 F,然后是 M。但由于未知原因,我得到以下输出排序顺序:
545,1,7652 100000
545,23,390159.402343750 100001
452,13,132586 100002
452,4,32202 100004
452,1,9310 100007
452,1,4057 100018
452,3,18970 100021
但我想要以下输出:
545,23,390159.402343750 100001
545,1,7652 100000
452,13,132586 100002
452,4,32202 100004
452,3,18970 100021
452,1,9310 100007
452,1,4057 100018
注意:customer_ID 是 Map 阶段的关键,属于特定 Customer_ID 的所有 RFM 值在 Reducer 中汇集在一起进行聚合。
经过大量搜索,我发现了一些有用的material我现在发布的汇编:
您必须从您的自定义数据类型开始。由于我有三个逗号分隔的值需要降序排序,因此我必须在 Hadoop 中创建一个 TextQuadlet.java
数据类型。我创建四元组的原因是因为键的第一部分将是自然键,其余三部分将是 R、F、M:
import java.io.*;
import org.apache.hadoop.io.*;
public class TextQuadlet implements WritableComparable<TextQuadlet> {
private String customer_id;
private long R;
private long F;
private double M;
public TextQuadlet() {
}
public TextQuadlet(String customer_id, long R, long F, double M) {
set(customer_id, R, F, M);
}
public void set(String customer_id2, long R2, long F2, double M2) {
this.customer_id = customer_id2;
this.R = R2;
this.F = F2;
this.M=M2;
}
public String getCustomer_id() {
return customer_id;
}
public long getR() {
return R;
}
public long getF() {
return F;
}
public double getM() {
return M;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.customer_id);
out.writeLong(this.R);
out.writeLong(this.F);
out.writeDouble(this.M);
}
@Override
public void readFields(DataInput in) throws IOException {
this.customer_id = in.readUTF();
this.R = in.readLong();
this.F = in.readLong();
this.M = in.readDouble();
}
// This hashcode function is important as it is used by the custom
// partitioner for this class.
@Override
public int hashCode() {
return (int) (customer_id.hashCode() * 163 + R + F + M);
}
@Override
public boolean equals(Object o) {
if (o instanceof TextQuadlet) {
TextQuadlet tp = (TextQuadlet) o;
return customer_id.equals(tp.customer_id) && R == (tp.R) && F==(tp.F) && M==(tp.M);
}
return false;
}
@Override
public String toString() {
return customer_id + "," + R + "," + F + "," + M;
}
// LHS in the conditional statement is the current key
// RHS in the conditional statement is the previous key
// When you return a negative value, it means that you are exchanging
// the positions of current and previous key-value pair
// Returning 0 or a positive value means that you are keeping the
// order as it is
@Override
public int compareTo(TextQuadlet tp) {
// Here my natural is is customer_id and I don't even take it into
// consideration.
// So as you might have concluded, I am sorting R,F,M descendingly.
if (this.R != tp.R) {
if(this.R < tp.R) {
return 1;
}
else{
return -1;
}
}
if (this.F != tp.F) {
if(this.F < tp.F) {
return 1;
}
else{
return -1;
}
}
if (this.M != tp.M){
if(this.M < tp.M) {
return 1;
}
else{
return -1;
}
}
return 0;
}
public static int compare(TextQuadlet tp1, TextQuadlet tp2) {
int cmp = tp1.compareTo(tp2);
return cmp;
}
public static int compare(Text customer_id1, Text customer_id2) {
int cmp = customer_id1.compareTo(customer_id1);
return cmp;
}
}
接下来您需要一个自定义分区器,以便所有具有相同键的值最终都在一个缩减器中:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FirstPartitioner_RFM extends Partitioner<TextQuadlet, Text> {
@Override
public int getPartition(TextQuadlet key, Text value, int numPartitions) {
return (int) key.hashCode() % numPartitions;
}
}
第三,您需要一个自定义组比较器,以便所有值都按其自然键 customer_id
而不是复合键 customer_id,R,F,M
:
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupComparator_RFM_N extends WritableComparator {
protected GroupComparator_RFM_N() {
super(TextQuadlet.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
TextQuadlet ip1 = (TextQuadlet) w1;
TextQuadlet ip2 = (TextQuadlet) w2;
// Here we tell hadoop to group the keys by their natural key.
return ip1.getCustomer_id().compareTo(ip2.getCustomer_id());
}
}
第四,您将需要一个键比较器,它将再次根据 R、F、M 降序对键进行排序,并实现 TextQuadlet.java
中使用的相同排序技术。由于我在编码时迷路了,所以我稍微改变了在这个函数中比较数据类型的方式,但底层逻辑与 TextQuadlet.java
:
中的相同
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class KeyComparator_RFM extends WritableComparator {
protected KeyComparator_RFM() {
super(TextQuadlet.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
TextQuadlet ip1 = (TextQuadlet) w1;
TextQuadlet ip2 = (TextQuadlet) w2;
// LHS in the conditional statement is the current key-value pair
// RHS in the conditional statement is the previous key-value pair
// When you return a negative value, it means that you are exchanging
// the positions of current and previous key-value pair
// If you are comparing strings, the string which ends up as the argument
// for the `compareTo` method turns out to be the previous key and the
// string which is invoking the `compareTo` method turns out to be the
// current key.
if(ip1.getR() == ip2.getR()){
if(ip1.getF() == ip2.getF()){
if(ip1.getM() == ip2.getM()){
return 0;
}
else{
if(ip1.getM() < ip2.getM())
return 1;
else
return -1;
}
}
else{
if(ip1.getF() < ip2.getF())
return 1;
else
return -1;
}
}
else{
if(ip1.getR() < ip2.getR())
return 1;
else
return -1;
}
}
}
最后,在您的驱动程序 class 中,您必须包含我们的自定义 classes。这里我使用 TextQuadlet,Text
作为 k-v 对。但您可以根据需要选择任何其他class。:
job.setPartitionerClass(FirstPartitioner_RFM.class);
job.setSortComparatorClass(KeyComparator_RFM.class);
job.setGroupingComparatorClass(GroupComparator_RFM_N.class);
job.setMapOutputKeyClass(TextQuadlet.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(TextQuadlet.class);
job.setOutputValueClass(Text.class);
如果我在代码或解释中的某处出现技术错误,请纠正我,因为我的回答完全基于我在互联网上阅读的个人理解,它对我来说非常有效。
我是 运行 使用 MapReduce 的 RFM 分析程序。 OutputKeyClass 是 Text.class,我将逗号分隔的 R(新近度)、F(频率)、M(货币)作为 Reducer 的键,其中 R=BigInteger、F=Binteger、M=BigDecimal,值也是代表 Customer_ID 的文本。我知道 Hadoop 根据键对输出进行排序,但我的最终结果有点奇怪。我希望输出键首先按 R 排序,然后是 F,然后是 M。但由于未知原因,我得到以下输出排序顺序:
545,1,7652 100000
545,23,390159.402343750 100001
452,13,132586 100002
452,4,32202 100004
452,1,9310 100007
452,1,4057 100018
452,3,18970 100021
但我想要以下输出:
545,23,390159.402343750 100001
545,1,7652 100000
452,13,132586 100002
452,4,32202 100004
452,3,18970 100021
452,1,9310 100007
452,1,4057 100018
注意:customer_ID 是 Map 阶段的关键,属于特定 Customer_ID 的所有 RFM 值在 Reducer 中汇集在一起进行聚合。
经过大量搜索,我发现了一些有用的material我现在发布的汇编:
您必须从您的自定义数据类型开始。由于我有三个逗号分隔的值需要降序排序,因此我必须在 Hadoop 中创建一个
TextQuadlet.java
数据类型。我创建四元组的原因是因为键的第一部分将是自然键,其余三部分将是 R、F、M:import java.io.*; import org.apache.hadoop.io.*; public class TextQuadlet implements WritableComparable<TextQuadlet> { private String customer_id; private long R; private long F; private double M; public TextQuadlet() { } public TextQuadlet(String customer_id, long R, long F, double M) { set(customer_id, R, F, M); } public void set(String customer_id2, long R2, long F2, double M2) { this.customer_id = customer_id2; this.R = R2; this.F = F2; this.M=M2; } public String getCustomer_id() { return customer_id; } public long getR() { return R; } public long getF() { return F; } public double getM() { return M; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.customer_id); out.writeLong(this.R); out.writeLong(this.F); out.writeDouble(this.M); } @Override public void readFields(DataInput in) throws IOException { this.customer_id = in.readUTF(); this.R = in.readLong(); this.F = in.readLong(); this.M = in.readDouble(); } // This hashcode function is important as it is used by the custom // partitioner for this class. @Override public int hashCode() { return (int) (customer_id.hashCode() * 163 + R + F + M); } @Override public boolean equals(Object o) { if (o instanceof TextQuadlet) { TextQuadlet tp = (TextQuadlet) o; return customer_id.equals(tp.customer_id) && R == (tp.R) && F==(tp.F) && M==(tp.M); } return false; } @Override public String toString() { return customer_id + "," + R + "," + F + "," + M; } // LHS in the conditional statement is the current key // RHS in the conditional statement is the previous key // When you return a negative value, it means that you are exchanging // the positions of current and previous key-value pair // Returning 0 or a positive value means that you are keeping the // order as it is @Override public int compareTo(TextQuadlet tp) { // Here my natural is is customer_id and I don't even take it into // consideration. // So as you might have concluded, I am sorting R,F,M descendingly. if (this.R != tp.R) { if(this.R < tp.R) { return 1; } else{ return -1; } } if (this.F != tp.F) { if(this.F < tp.F) { return 1; } else{ return -1; } } if (this.M != tp.M){ if(this.M < tp.M) { return 1; } else{ return -1; } } return 0; } public static int compare(TextQuadlet tp1, TextQuadlet tp2) { int cmp = tp1.compareTo(tp2); return cmp; } public static int compare(Text customer_id1, Text customer_id2) { int cmp = customer_id1.compareTo(customer_id1); return cmp; } }
接下来您需要一个自定义分区器,以便所有具有相同键的值最终都在一个缩减器中:
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class FirstPartitioner_RFM extends Partitioner<TextQuadlet, Text> { @Override public int getPartition(TextQuadlet key, Text value, int numPartitions) { return (int) key.hashCode() % numPartitions; } }
第三,您需要一个自定义组比较器,以便所有值都按其自然键
customer_id
而不是复合键customer_id,R,F,M
:import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupComparator_RFM_N extends WritableComparator { protected GroupComparator_RFM_N() { super(TextQuadlet.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable w1, WritableComparable w2) { TextQuadlet ip1 = (TextQuadlet) w1; TextQuadlet ip2 = (TextQuadlet) w2; // Here we tell hadoop to group the keys by their natural key. return ip1.getCustomer_id().compareTo(ip2.getCustomer_id()); } }
第四,您将需要一个键比较器,它将再次根据 R、F、M 降序对键进行排序,并实现
中的相同TextQuadlet.java
中使用的相同排序技术。由于我在编码时迷路了,所以我稍微改变了在这个函数中比较数据类型的方式,但底层逻辑与TextQuadlet.java
:import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class KeyComparator_RFM extends WritableComparator { protected KeyComparator_RFM() { super(TextQuadlet.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable w1, WritableComparable w2) { TextQuadlet ip1 = (TextQuadlet) w1; TextQuadlet ip2 = (TextQuadlet) w2; // LHS in the conditional statement is the current key-value pair // RHS in the conditional statement is the previous key-value pair // When you return a negative value, it means that you are exchanging // the positions of current and previous key-value pair // If you are comparing strings, the string which ends up as the argument // for the `compareTo` method turns out to be the previous key and the // string which is invoking the `compareTo` method turns out to be the // current key. if(ip1.getR() == ip2.getR()){ if(ip1.getF() == ip2.getF()){ if(ip1.getM() == ip2.getM()){ return 0; } else{ if(ip1.getM() < ip2.getM()) return 1; else return -1; } } else{ if(ip1.getF() < ip2.getF()) return 1; else return -1; } } else{ if(ip1.getR() < ip2.getR()) return 1; else return -1; } } }
最后,在您的驱动程序 class 中,您必须包含我们的自定义 classes。这里我使用
TextQuadlet,Text
作为 k-v 对。但您可以根据需要选择任何其他class。:job.setPartitionerClass(FirstPartitioner_RFM.class); job.setSortComparatorClass(KeyComparator_RFM.class); job.setGroupingComparatorClass(GroupComparator_RFM_N.class); job.setMapOutputKeyClass(TextQuadlet.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(TextQuadlet.class); job.setOutputValueClass(Text.class);
如果我在代码或解释中的某处出现技术错误,请纠正我,因为我的回答完全基于我在互联网上阅读的个人理解,它对我来说非常有效。