Apache Flink 上的 zipWithIndex
zipWithIndex on Apache Flink
我想为输入的每一行分配一个 id
- 它应该是从 0
到 N - 1
的数字,其中 N
是数字输入中的行数。
大致上,我希望能够执行以下操作:
val data = sc.textFile(textFilePath, numPartitions)
val rdd = data.map(line => process(line))
val rddMatrixLike = rdd.zipWithIndex.map { case (v, idx) => someStuffWithIndex(idx, v) }
但是在 Apache Flink 中。可能吗?
下面是函数的简单实现:
public class ZipWithIndex {
public static void main(String[] args) throws Exception {
ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input");
// count elements in each partition
DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() {
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
long cnt = 0;
for (String v : values) {
cnt++;
}
out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
}
});
DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() {
long start = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
@Override
public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
return ZipWithIndex.compare(o1.f0, o2.f0);
}
});
for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
start += offsets.get(i).f1;
}
}
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception {
for(String v: values) {
out.collect(new Tuple2<Long, String>(start++, v));
}
}
}).withBroadcastSet(counts, "counts");
result.print();
}
public static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
}
它是这样工作的:我正在使用第一个 mapPartition()
操作遍历分区中的所有元素以计算其中有多少元素。
我需要知道每个分区中的元素数量,以便在将 ID 分配给元素时正确设置偏移量。
第一个 mapPartition
的结果是一个包含映射的数据集。我将此 DataSet 广播给所有第二个 mapPartition()
运算符,这些运算符会将 ID 分配给输入中的元素。
在第二个 mapPartition()
的 open()
方法中,我正在计算每个分区的偏移量。
我可能会将代码贡献给 Flink(在与其他提交者讨论之后)。
这现在是 Apache Flink 0.10-SNAPSHOT 版本的一部分。 zipWithIndex(in)
和 zipWithUniqueId(in)
的示例可在官方 Flink documentation.
中找到
我想为输入的每一行分配一个 id
- 它应该是从 0
到 N - 1
的数字,其中 N
是数字输入中的行数。
大致上,我希望能够执行以下操作:
val data = sc.textFile(textFilePath, numPartitions)
val rdd = data.map(line => process(line))
val rddMatrixLike = rdd.zipWithIndex.map { case (v, idx) => someStuffWithIndex(idx, v) }
但是在 Apache Flink 中。可能吗?
下面是函数的简单实现:
public class ZipWithIndex {
public static void main(String[] args) throws Exception {
ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input");
// count elements in each partition
DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() {
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
long cnt = 0;
for (String v : values) {
cnt++;
}
out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
}
});
DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() {
long start = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
@Override
public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
return ZipWithIndex.compare(o1.f0, o2.f0);
}
});
for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
start += offsets.get(i).f1;
}
}
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception {
for(String v: values) {
out.collect(new Tuple2<Long, String>(start++, v));
}
}
}).withBroadcastSet(counts, "counts");
result.print();
}
public static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
}
它是这样工作的:我正在使用第一个 mapPartition()
操作遍历分区中的所有元素以计算其中有多少元素。
我需要知道每个分区中的元素数量,以便在将 ID 分配给元素时正确设置偏移量。
第一个 mapPartition
的结果是一个包含映射的数据集。我将此 DataSet 广播给所有第二个 mapPartition()
运算符,这些运算符会将 ID 分配给输入中的元素。
在第二个 mapPartition()
的 open()
方法中,我正在计算每个分区的偏移量。
我可能会将代码贡献给 Flink(在与其他提交者讨论之后)。
这现在是 Apache Flink 0.10-SNAPSHOT 版本的一部分。 zipWithIndex(in)
和 zipWithUniqueId(in)
的示例可在官方 Flink documentation.