Flink 中的预洗牌聚合

Pre-shuffle aggregation in Flink

我们正在将 spark 作业迁移到 flink。我们在 spark 中使用了 pre-shuffle 聚合。有没有办法在 spark.xml 中执行类似的操作?我们正在使用来自 apache kafka 的数据。我们正在使用键控翻滚 window 来聚合数据。我们希望在执行 shuffle 之前将 flink 中的数据进行聚合。

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

是的,这是可能的,我将描述三种方式。首先是已经内置的 Flink Table API。第二种方式你必须构建自己的预聚合运算符。第三个是动态预聚合运算符,它在洗牌阶段之前调整要预聚合的事件数。

弗林克TableAPI

it is shown here一样,您可以进行MiniBatch聚合本地-全局聚合。第二种选择更好。您基本上告诉 Flink 创建每个 5000 事件的小批量,并在洗牌阶段之前预先聚合它们。

// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

Flink 流API

这种方式比较麻烦,因为您必须使用 OneInputStreamOperator 创建自己的运算符并使用 doTransform() 调用它。这是 BundleOperator 的示例。

public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
  extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
  implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
@Override
 public void processElement(StreamRecord<IN> element) throws Exception {
  // get the key and value for the map bundle
  final IN input = element.getValue();
  final K bundleKey = getKey(input);
  final V bundleValue = this.bundle.get(bundleKey);

  // get a new value after adding this element to bundle
  final V newBundleValue = userFunction.addInput(bundleValue, input);

  // update to map bundle
  bundle.put(bundleKey, newBundleValue);

  numOfElements++;
  bundleTrigger.onElement(input);
 }

 @Override
 public void finishBundle() throws Exception {
  if (!bundle.isEmpty()) {
   numOfElements = 0;
   userFunction.finishBundle(bundle, collector);
   bundle.clear();
  }
  bundleTrigger.reset();
 }
}

回调接口定义何时触发预聚合。每次流达到 if (count >= maxCount) 的捆绑限制时,您的预聚合运算符都会向随机播放阶段发出事件。

public class CountBundleTrigger<T> implements BundleTrigger<T> {
 private final long maxCount;
 private transient BundleTriggerCallback callback;
 private transient long count = 0;

 public CountBundleTrigger(long maxCount) {
  Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
  this.maxCount = maxCount;
 }

 @Override
 public void registerCallback(BundleTriggerCallback callback) {
  this.callback = Preconditions.checkNotNull(callback, "callback is null");
 }

 @Override
 public void onElement(T element) throws Exception {
  count++;
  if (count >= maxCount) {
   callback.finishBundle();
   reset();
  }
 }

 @Override
 public void reset() {
  count = 0;
 }
}

然后您使用 doTransform:

呼叫接线员
myStream.map(....)
 .doTransform(metricCombiner, info, new RichMapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector))
 .map(...)
 .keyBy(...)
 .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))

动态预聚合

如果您希望使用动态预聚合运算符,请检查 AdCom - Adaptive Combiner for stream aggregation。它基本上根据背压信号调整预聚合。它导致使用最大可能的洗牌阶段。