Apache Flink:KeyedStream 上的数据分布偏斜
Apache Flink: Skewed data distribution on KeyedStream
我在 Flink 中有这个 Java 代码:
env.setParallelism(6);
//Read from Kafka topic with 12 partitions
DataStream<String> line = env.addSource(myConsumer);
//Filter half of the records
DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
DataStream<Tuple3<String, String, Integer>> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder());
//Filter the other half
DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
DataStream<Tuple3<String, String, Integer>> line_Num_Even_2 = line_Num_Even.map(new EvenAdder());
//Join all the data again
DataStream<Tuple3<String, String, Integer>> line_Num_U = line_Num_Odd_2.union(line_Num_Even_2);
//Window
DataStream<Tuple3<String, String, Integer>> windowedLine_Num_U_K = line_Num_U
.keyBy(1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new Reducer());
问题是 window 应该能够以 parallelism = 2 进行处理,因为在元组 3。一切都是 运行 并行度 6 但不是 window,它是 运行 并行度 = 1,因为我的要求,我只需要它具有并行度 = 2。
代码中用到的函数如下:
public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {
public boolean filter(Tuple2<String, Integer> line) throws Exception {
Boolean isOdd = (Long.valueOf(line.f0.split(" ")[0]) % 2) != 0;
return isOdd;
}
};
public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {
public boolean filter(Tuple2<String, Integer> line) throws Exception {
Boolean isEven = (Long.valueOf(line.f0.split(" ")[0]) % 2) == 0;
return isEven;
}
};
public static class OddAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "odd", line.f1);
return newLine;
}
};
public static class EvenAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "even", line.f1);
return newLine;
}
};
public static class Reducer implements ReduceFunction<Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> line1,
Tuple3<String, String, Integer> line2) throws Exception {
Long sum = Long.valueOf(line1.f0.split(" ")[0]) + Long.valueOf(line2.f0.split(" ")[0]);
Long sumTS = Long.valueOf(line1.f0.split(" ")[1]) + Long.valueOf(line2.f0.split(" ")[1]);
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(String.valueOf(sum) +
" " + String.valueOf(sumTS), line1.f1, line1.f2 + line2.f2);
return newLine;
}
};
感谢您的帮助!
解决方案:我已将键的内容从 "odd" 和 "even" 更改为 "odd0000" 和 "even1111",现在可以正常使用了.
密钥通过散列分区分配给工作线程。这意味着键值被散列并且线程由模 #workers 确定。对于两个键和两个线程,很有可能两个键都分配给同一个线程。
您可以尝试使用不同的键值,其散列值分布在两个线程中。
我在 Flink 中有这个 Java 代码:
env.setParallelism(6);
//Read from Kafka topic with 12 partitions
DataStream<String> line = env.addSource(myConsumer);
//Filter half of the records
DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
DataStream<Tuple3<String, String, Integer>> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder());
//Filter the other half
DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
DataStream<Tuple3<String, String, Integer>> line_Num_Even_2 = line_Num_Even.map(new EvenAdder());
//Join all the data again
DataStream<Tuple3<String, String, Integer>> line_Num_U = line_Num_Odd_2.union(line_Num_Even_2);
//Window
DataStream<Tuple3<String, String, Integer>> windowedLine_Num_U_K = line_Num_U
.keyBy(1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new Reducer());
问题是 window 应该能够以 parallelism = 2 进行处理,因为在元组 3。一切都是 运行 并行度 6 但不是 window,它是 运行 并行度 = 1,因为我的要求,我只需要它具有并行度 = 2。
代码中用到的函数如下:
public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {
public boolean filter(Tuple2<String, Integer> line) throws Exception {
Boolean isOdd = (Long.valueOf(line.f0.split(" ")[0]) % 2) != 0;
return isOdd;
}
};
public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {
public boolean filter(Tuple2<String, Integer> line) throws Exception {
Boolean isEven = (Long.valueOf(line.f0.split(" ")[0]) % 2) == 0;
return isEven;
}
};
public static class OddAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "odd", line.f1);
return newLine;
}
};
public static class EvenAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "even", line.f1);
return newLine;
}
};
public static class Reducer implements ReduceFunction<Tuple3<String, String, Integer>> {
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> line1,
Tuple3<String, String, Integer> line2) throws Exception {
Long sum = Long.valueOf(line1.f0.split(" ")[0]) + Long.valueOf(line2.f0.split(" ")[0]);
Long sumTS = Long.valueOf(line1.f0.split(" ")[1]) + Long.valueOf(line2.f0.split(" ")[1]);
Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(String.valueOf(sum) +
" " + String.valueOf(sumTS), line1.f1, line1.f2 + line2.f2);
return newLine;
}
};
感谢您的帮助!
解决方案:我已将键的内容从 "odd" 和 "even" 更改为 "odd0000" 和 "even1111",现在可以正常使用了.
密钥通过散列分区分配给工作线程。这意味着键值被散列并且线程由模 #workers 确定。对于两个键和两个线程,很有可能两个键都分配给同一个线程。
您可以尝试使用不同的键值,其散列值分布在两个线程中。