Flink 中的预洗牌聚合
Pre-shuffle aggregation in Flink
我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafka 的数据。我们正在使用键控翻滚 window 来聚合数据。我们希望在执行 shuffle 之前将 flink 中的数据进行聚合。
是的,这是可能的,我将描述三种方式。首先是已经内置的 Flink Table API。第二种方式你必须构建自己的预聚合运算符。第三个是动态预聚合运算符,它在洗牌阶段之前调整要预聚合的事件数。
弗林克TableAPI
与it is shown here一样,您可以进行MiniBatch聚合或本地-全局聚合。第二种选择更好。您基本上告诉 Flink 创建每个 5000
事件的小批量,并在洗牌阶段之前预先聚合它们。
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
Flink 流API
这种方式比较麻烦,因为您必须使用 OneInputStreamOperator
创建自己的运算符并使用 doTransform() 调用它。这是 BundleOperator 的示例。
public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// get the key and value for the map bundle
final IN input = element.getValue();
final K bundleKey = getKey(input);
final V bundleValue = this.bundle.get(bundleKey);
// get a new value after adding this element to bundle
final V newBundleValue = userFunction.addInput(bundleValue, input);
// update to map bundle
bundle.put(bundleKey, newBundleValue);
numOfElements++;
bundleTrigger.onElement(input);
}
@Override
public void finishBundle() throws Exception {
if (!bundle.isEmpty()) {
numOfElements = 0;
userFunction.finishBundle(bundle, collector);
bundle.clear();
}
bundleTrigger.reset();
}
}
回调接口定义何时触发预聚合。每次流达到 if (count >= maxCount)
的捆绑限制时,您的预聚合运算符都会向随机播放阶段发出事件。
public class CountBundleTrigger<T> implements BundleTrigger<T> {
private final long maxCount;
private transient BundleTriggerCallback callback;
private transient long count = 0;
public CountBundleTrigger(long maxCount) {
Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
this.maxCount = maxCount;
}
@Override
public void registerCallback(BundleTriggerCallback callback) {
this.callback = Preconditions.checkNotNull(callback, "callback is null");
}
@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}
@Override
public void reset() {
count = 0;
}
}
然后您使用 doTransform
:
呼叫接线员
myStream.map(....)
.doTransform(metricCombiner, info, new RichMapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector))
.map(...)
.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
动态预聚合
如果您希望使用动态预聚合运算符,请检查 AdCom - Adaptive Combiner for stream aggregation。它基本上根据背压信号调整预聚合。它导致使用最大可能的洗牌阶段。
我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafka 的数据。我们正在使用键控翻滚 window 来聚合数据。我们希望在执行 shuffle 之前将 flink 中的数据进行聚合。
是的,这是可能的,我将描述三种方式。首先是已经内置的 Flink Table API。第二种方式你必须构建自己的预聚合运算符。第三个是动态预聚合运算符,它在洗牌阶段之前调整要预聚合的事件数。
弗林克TableAPI
与it is shown here一样,您可以进行MiniBatch聚合或本地-全局聚合。第二种选择更好。您基本上告诉 Flink 创建每个 5000
事件的小批量,并在洗牌阶段之前预先聚合它们。
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
Flink 流API
这种方式比较麻烦,因为您必须使用 OneInputStreamOperator
创建自己的运算符并使用 doTransform() 调用它。这是 BundleOperator 的示例。
public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// get the key and value for the map bundle
final IN input = element.getValue();
final K bundleKey = getKey(input);
final V bundleValue = this.bundle.get(bundleKey);
// get a new value after adding this element to bundle
final V newBundleValue = userFunction.addInput(bundleValue, input);
// update to map bundle
bundle.put(bundleKey, newBundleValue);
numOfElements++;
bundleTrigger.onElement(input);
}
@Override
public void finishBundle() throws Exception {
if (!bundle.isEmpty()) {
numOfElements = 0;
userFunction.finishBundle(bundle, collector);
bundle.clear();
}
bundleTrigger.reset();
}
}
回调接口定义何时触发预聚合。每次流达到 if (count >= maxCount)
的捆绑限制时,您的预聚合运算符都会向随机播放阶段发出事件。
public class CountBundleTrigger<T> implements BundleTrigger<T> {
private final long maxCount;
private transient BundleTriggerCallback callback;
private transient long count = 0;
public CountBundleTrigger(long maxCount) {
Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
this.maxCount = maxCount;
}
@Override
public void registerCallback(BundleTriggerCallback callback) {
this.callback = Preconditions.checkNotNull(callback, "callback is null");
}
@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}
@Override
public void reset() {
count = 0;
}
}
然后您使用 doTransform
:
myStream.map(....)
.doTransform(metricCombiner, info, new RichMapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector))
.map(...)
.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
动态预聚合
如果您希望使用动态预聚合运算符,请检查 AdCom - Adaptive Combiner for stream aggregation。它基本上根据背压信号调整预聚合。它导致使用最大可能的洗牌阶段。