Flink 预洗牌聚合不起作用

Flink pre shuffle aggregation is not working

我正在尝试在 flink 中进行预洗牌聚合。以下是 MapBundle 实现。

public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {

@Override
public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
    if (value == null) {
        return input;
    }
    value.tip = value.tip + input.tip;
    return value;
}

@Override
public void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) throws Exception {
    for (Map.Entry<Long, TaxiFare> entry : buffer.entrySet()) {
        out.collect(entry.getValue());
    }
}

}

我正在使用“CountBundleTrigger.java”。但是预洗牌聚合不起作用,因为“计数”变量始终为 0。如果我遗漏了什么,请告诉我。

@Override
public void onElement(T element) throws Exception {
    count++;
    if (count >= maxCount) {
        callback.finishBundle();
        reset();
    }
}

这是主要代码。

    MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
    BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
    KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new KeySelector<TaxiFare, Long>() {
        @Override
        public Long getKey(TaxiFare value) throws Exception {
            return value.driverId;
        }
    };
    DataStream<Tuple3<Long, Long, Float>> hourlyTips =
//                            fares.keyBy((TaxiFare fare) -> fare.driverId)
//                                             
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());;
            fares.transform("preshuffle", TypeInformation.of(TaxiFare.class),
                    new TaxiFareStream(mapBundleFunction, bundleTrigger, 
taxiFareLongKeySelector
            ))
                    .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
                        @Override
                        public long extractTimestamp(TaxiFare element) {
                            return element.startTime.getEpochSecond();
                        }
                    })
                    .keyBy((TaxiFare fare) -> fare.driverId)
                    .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                    .process(new AddTips());

    DataStream<Tuple3<Long, Long, Float>> hourlyMax =
            hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);

这是 TaxiFareStream.java 的代码。

public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, TaxiFare, TaxiFare> {
    private KeySelector<TaxiFare, Long> keySelector;
    public TaxiFareStream(MapBundleFunction<Long, TaxiFare, 
        TaxiFare, TaxiFare> userFunction,
                      BundleTrigger<TaxiFare> bundleTrigger,
                      KeySelector<TaxiFare, Long> keySelector) {
        super(userFunction, bundleTrigger, keySelector);
        this.keySelector = keySelector;
    }

    @Override
    protected Long getKey(TaxiFare input) throws Exception {
        return keySelector.getKey(input);
    }
}

更新 我创建了以下 class 但我看到了一个错误。我认为它无法序列化 class MapStreamBundleOperator.java

public class MapStreamBundleOperator<K, V, IN, OUT> extends
                                                    AbstractMapStreamBundleOperator<K, V, IN, OUT> {

    private static final long serialVersionUID = 6556268125924098320L;

    /** KeySelector is used to extract key for bundle map. */
    private final KeySelector<IN, K> keySelector;

    public MapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger,
                                   KeySelector<IN, K> keySelector) {
        super(function, bundleTrigger);
        this.keySelector = keySelector;
    }

    @Override
    protected K getKey(IN input) throws Exception {
        return this.keySelector.getKey(input);
    }
}

`

2021-08-27 05:06:04,814 ERROR FlinkDefaults.class                                           - Stream execution failed
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
        at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:247)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:497)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:318)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:297)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:297)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:264)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:173)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113)
        at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
        at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
        at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
        at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
        at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810)
        at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
        at com.pinterest.xenon.flink.FlinkDefaults$.run(FlinkDefaults.scala:46)
        at com.pinterest.xenon.flink.FlinkWorkflow.run(FlinkWorkflow.scala:74)
        at com.pinterest.xenon.flink.WorkflowLauncher$.executeWorkflow(WorkflowLauncher.scala:43)
        at com.pinterest.xenon.flink.WorkflowLauncher$.delayedEndpoint$com$pinterest$xenon$flink$WorkflowLauncher(WorkflowLauncher.scala:25)
        at com.pinterest.xenon.flink.WorkflowLauncher$delayedInit$body.apply(WorkflowLauncher.scala:9)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main.apply(App.scala:76)
        at scala.App$$anonfun$main.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.pinterest.xenon.flink.WorkflowLauncher$.main(WorkflowLauncher.scala:9)
        at com.pinterest.xenon.flink.WorkflowLauncher.main(WorkflowLauncher.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:168)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:992)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.io.NotSerializableException: visibility.mabs.src.main.java.com.pinterest.mabs.MabsFlinkJob
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

`

我不会依赖官方 MapBundleOperator 因为大卫已经说过这没有很好的记录。我会根据自己的AbstractMapStreamBundleOperator来回答这个问题。我认为您在 processElement() 方法中缺少计数器 numOfElements++; 。而且最好使用泛型类型。使用这个 code:

public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
    private static final long serialVersionUID = 1L;
    private final Map<K, V> bundle;
    private final BundleTrigger<IN> bundleTrigger;
    private transient TimestampedCollector<OUT> collector;
    private transient int numOfElements = 0;

    public AbstractMapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger) {
        super(function);
        chainingStrategy = ChainingStrategy.ALWAYS;
        this.bundle = new HashMap<>();
        this.bundleTrigger = checkNotNull(bundleTrigger, "bundleTrigger is null");
    }
    @Override
    public void open() throws Exception {
        super.open();
        numOfElements = 0;
        collector = new TimestampedCollector<>(output);
        bundleTrigger.registerCallback(this);
        // reset trigger
        bundleTrigger.reset();
    }
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // get the key and value for the map bundle
        final IN input = element.getValue();
        final K bundleKey = getKey(input);
        final V bundleValue = this.bundle.get(bundleKey);

        // get a new value after adding this element to bundle
        final V newBundleValue = userFunction.addInput(bundleValue, input);

        // update to map bundle
        bundle.put(bundleKey, newBundleValue);
        numOfElements++;
        bundleTrigger.onElement(input);
    }
    protected abstract K getKey(final IN input) throws Exception;
    @Override
    public void finishBundle() throws Exception {
        if (!bundle.isEmpty()) {
            numOfElements = 0;
            userFunction.finishBundle(bundle, collector);
            bundle.clear();
        }
        bundleTrigger.reset();
    }
}

然后像您已有的那样创建 MapStreamBundleOperator。使用这个 code:

public class MapStreamBundleOperator<K, V, IN, OUT> extends AbstractMapStreamBundleOperator<K, V, IN, OUT> {
    private final KeySelector<IN, K> keySelector;
    public MapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger,
            KeySelector<IN, K> keySelector) {
        super(function, bundleTrigger);
        this.keySelector = keySelector;
    }
    @Override
    protected K getKey(IN input) throws Exception {
        return this.keySelector.getKey(input);
    }
}

触发器内的计数器使 Bundle 运算符将事件刷新到下一阶段。 CountBundleTrigger 如下所示。使用这个 code. You will need also the BundleTriggerCallback.

public class CountBundleTrigger<T> implements BundleTrigger<T> {
    private final long maxCount;
    private transient BundleTriggerCallback callback;
    private transient long count = 0;

    public CountBundleTrigger(long maxCount) {
        Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
        this.maxCount = maxCount;
    }
    @Override
    public void registerCallback(BundleTriggerCallback callback) {
        this.callback = Preconditions.checkNotNull(callback, "callback is null");
    }
    @Override
    public void onElement(T element) throws Exception {
        count++;
        if (count >= maxCount) {
            callback.finishBundle();
            reset();
        }
    }
    @Override
    public void reset() { count = 0; }
    @Override
    public String explain() {
        return "CountBundleTrigger with size " + maxCount;
    }
}

然后你必须创建一个这个触发器来传递你的操作员。我在这里创建了一组 100 TaxiFare 事件。取this example with another POJO. I wrote the MapBundleTaxiFareImpl here but you can create your UDF based on this one.

private OneInputStreamOperator<Tuple2<Long, TaxiFare>, Tuple2<Long, TaxiFare>> getPreAggOperator() {
   MapBundleFunction<Long, TaxiFare, Tuple2<Long, TaxiFare>, Tuple2<Long, TaxiFare>> myMapBundleFunction = new MapBundleTaxiFareImpl();
   CountBundleTrigger<Tuple2<Long, TaxiFare>> bundleTrigger = new CountBundleTrigger<Tuple2<Long, TaxiFare>>(100);
   return new MapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector);
}

最后,您在某处使用 transform() 调用了这个新运算符。拿this example with another POJO.

stream
...
.transform("my-pre-agg", 
    TypeInformation.of(new TypeHint<Tuple2<Long, TaxiFare>>(){}), getPreAggOperator())
...

我认为这就是您所需要的。尝试使用那些 class ,如果它缺少某些东西,它可能在我放置链接的 gitrepository 上。我希望你能成功。