Flink 预洗牌聚合不起作用
Flink pre shuffle aggregation is not working
我正在尝试在 flink 中进行预洗牌聚合。以下是 MapBundle 实现。
public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {
@Override
public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
if (value == null) {
return input;
}
value.tip = value.tip + input.tip;
return value;
}
@Override
public void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) throws Exception {
for (Map.Entry<Long, TaxiFare> entry : buffer.entrySet()) {
out.collect(entry.getValue());
}
}
}
我正在使用“CountBundleTrigger.java”。但是预洗牌聚合不起作用,因为“计数”变量始终为 0。如果我遗漏了什么,请告诉我。
@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}
这是主要代码。
MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new KeySelector<TaxiFare, Long>() {
@Override
public Long getKey(TaxiFare value) throws Exception {
return value.driverId;
}
};
DataStream<Tuple3<Long, Long, Float>> hourlyTips =
// fares.keyBy((TaxiFare fare) -> fare.driverId)
//
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());;
fares.transform("preshuffle", TypeInformation.of(TaxiFare.class),
new TaxiFareStream(mapBundleFunction, bundleTrigger,
taxiFareLongKeySelector
))
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
@Override
public long extractTimestamp(TaxiFare element) {
return element.startTime.getEpochSecond();
}
})
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new AddTips());
DataStream<Tuple3<Long, Long, Float>> hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
这是 TaxiFareStream.java 的代码。
public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, TaxiFare, TaxiFare> {
private KeySelector<TaxiFare, Long> keySelector;
public TaxiFareStream(MapBundleFunction<Long, TaxiFare,
TaxiFare, TaxiFare> userFunction,
BundleTrigger<TaxiFare> bundleTrigger,
KeySelector<TaxiFare, Long> keySelector) {
super(userFunction, bundleTrigger, keySelector);
this.keySelector = keySelector;
}
@Override
protected Long getKey(TaxiFare input) throws Exception {
return keySelector.getKey(input);
}
}
更新
我创建了以下 class 但我看到了一个错误。我认为它无法序列化 class MapStreamBundleOperator.java
public class MapStreamBundleOperator<K, V, IN, OUT> extends
AbstractMapStreamBundleOperator<K, V, IN, OUT> {
private static final long serialVersionUID = 6556268125924098320L;
/** KeySelector is used to extract key for bundle map. */
private final KeySelector<IN, K> keySelector;
public MapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger,
KeySelector<IN, K> keySelector) {
super(function, bundleTrigger);
this.keySelector = keySelector;
}
@Override
protected K getKey(IN input) throws Exception {
return this.keySelector.getKey(input);
}
}
`
2021-08-27 05:06:04,814 ERROR FlinkDefaults.class - Stream execution failed
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:247)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:497)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:318)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:297)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:297)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:264)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:173)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
at com.pinterest.xenon.flink.FlinkDefaults$.run(FlinkDefaults.scala:46)
at com.pinterest.xenon.flink.FlinkWorkflow.run(FlinkWorkflow.scala:74)
at com.pinterest.xenon.flink.WorkflowLauncher$.executeWorkflow(WorkflowLauncher.scala:43)
at com.pinterest.xenon.flink.WorkflowLauncher$.delayedEndpoint$com$pinterest$xenon$flink$WorkflowLauncher(WorkflowLauncher.scala:25)
at com.pinterest.xenon.flink.WorkflowLauncher$delayedInit$body.apply(WorkflowLauncher.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.pinterest.xenon.flink.WorkflowLauncher$.main(WorkflowLauncher.scala:9)
at com.pinterest.xenon.flink.WorkflowLauncher.main(WorkflowLauncher.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:168)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.io.NotSerializableException: visibility.mabs.src.main.java.com.pinterest.mabs.MabsFlinkJob
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
`
我不会依赖官方 MapBundleOperator
因为大卫已经说过这没有很好的记录。我会根据自己的AbstractMapStreamBundleOperator
来回答这个问题。我认为您在 processElement()
方法中缺少计数器 numOfElements++;
。而且最好使用泛型类型。使用这个 code:
public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
private static final long serialVersionUID = 1L;
private final Map<K, V> bundle;
private final BundleTrigger<IN> bundleTrigger;
private transient TimestampedCollector<OUT> collector;
private transient int numOfElements = 0;
public AbstractMapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger) {
super(function);
chainingStrategy = ChainingStrategy.ALWAYS;
this.bundle = new HashMap<>();
this.bundleTrigger = checkNotNull(bundleTrigger, "bundleTrigger is null");
}
@Override
public void open() throws Exception {
super.open();
numOfElements = 0;
collector = new TimestampedCollector<>(output);
bundleTrigger.registerCallback(this);
// reset trigger
bundleTrigger.reset();
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// get the key and value for the map bundle
final IN input = element.getValue();
final K bundleKey = getKey(input);
final V bundleValue = this.bundle.get(bundleKey);
// get a new value after adding this element to bundle
final V newBundleValue = userFunction.addInput(bundleValue, input);
// update to map bundle
bundle.put(bundleKey, newBundleValue);
numOfElements++;
bundleTrigger.onElement(input);
}
protected abstract K getKey(final IN input) throws Exception;
@Override
public void finishBundle() throws Exception {
if (!bundle.isEmpty()) {
numOfElements = 0;
userFunction.finishBundle(bundle, collector);
bundle.clear();
}
bundleTrigger.reset();
}
}
然后像您已有的那样创建 MapStreamBundleOperator
。使用这个 code:
public class MapStreamBundleOperator<K, V, IN, OUT> extends AbstractMapStreamBundleOperator<K, V, IN, OUT> {
private final KeySelector<IN, K> keySelector;
public MapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger,
KeySelector<IN, K> keySelector) {
super(function, bundleTrigger);
this.keySelector = keySelector;
}
@Override
protected K getKey(IN input) throws Exception {
return this.keySelector.getKey(input);
}
}
触发器内的计数器使 Bundle 运算符将事件刷新到下一阶段。 CountBundleTrigger
如下所示。使用这个 code. You will need also the BundleTriggerCallback.
public class CountBundleTrigger<T> implements BundleTrigger<T> {
private final long maxCount;
private transient BundleTriggerCallback callback;
private transient long count = 0;
public CountBundleTrigger(long maxCount) {
Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
this.maxCount = maxCount;
}
@Override
public void registerCallback(BundleTriggerCallback callback) {
this.callback = Preconditions.checkNotNull(callback, "callback is null");
}
@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}
@Override
public void reset() { count = 0; }
@Override
public String explain() {
return "CountBundleTrigger with size " + maxCount;
}
}
然后你必须创建一个这个触发器来传递你的操作员。我在这里创建了一组 100
TaxiFare
事件。取this example with another POJO. I wrote the MapBundleTaxiFareImpl
here but you can create your UDF based on this one.
private OneInputStreamOperator<Tuple2<Long, TaxiFare>, Tuple2<Long, TaxiFare>> getPreAggOperator() {
MapBundleFunction<Long, TaxiFare, Tuple2<Long, TaxiFare>, Tuple2<Long, TaxiFare>> myMapBundleFunction = new MapBundleTaxiFareImpl();
CountBundleTrigger<Tuple2<Long, TaxiFare>> bundleTrigger = new CountBundleTrigger<Tuple2<Long, TaxiFare>>(100);
return new MapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector);
}
最后,您在某处使用 transform()
调用了这个新运算符。拿this example with another POJO.
stream
...
.transform("my-pre-agg",
TypeInformation.of(new TypeHint<Tuple2<Long, TaxiFare>>(){}), getPreAggOperator())
...
我认为这就是您所需要的。尝试使用那些 class ,如果它缺少某些东西,它可能在我放置链接的 gitrepository 上。我希望你能成功。
我正在尝试在 flink 中进行预洗牌聚合。以下是 MapBundle 实现。
public class TaxiFareMapBundleFunction extends MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> {
@Override
public TaxiFare addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
if (value == null) {
return input;
}
value.tip = value.tip + input.tip;
return value;
}
@Override
public void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out) throws Exception {
for (Map.Entry<Long, TaxiFare> entry : buffer.entrySet()) {
out.collect(entry.getValue());
}
}
}
我正在使用“CountBundleTrigger.java”。但是预洗牌聚合不起作用,因为“计数”变量始终为 0。如果我遗漏了什么,请告诉我。
@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}
这是主要代码。
MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare> mapBundleFunction = new TaxiFareMapBundleFunction();
BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new KeySelector<TaxiFare, Long>() {
@Override
public Long getKey(TaxiFare value) throws Exception {
return value.driverId;
}
};
DataStream<Tuple3<Long, Long, Float>> hourlyTips =
// fares.keyBy((TaxiFare fare) -> fare.driverId)
//
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new AddTips());;
fares.transform("preshuffle", TypeInformation.of(TaxiFare.class),
new TaxiFareStream(mapBundleFunction, bundleTrigger,
taxiFareLongKeySelector
))
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
@Override
public long extractTimestamp(TaxiFare element) {
return element.startTime.getEpochSecond();
}
})
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new AddTips());
DataStream<Tuple3<Long, Long, Float>> hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
这是 TaxiFareStream.java 的代码。
public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare, TaxiFare, TaxiFare> {
private KeySelector<TaxiFare, Long> keySelector;
public TaxiFareStream(MapBundleFunction<Long, TaxiFare,
TaxiFare, TaxiFare> userFunction,
BundleTrigger<TaxiFare> bundleTrigger,
KeySelector<TaxiFare, Long> keySelector) {
super(userFunction, bundleTrigger, keySelector);
this.keySelector = keySelector;
}
@Override
protected Long getKey(TaxiFare input) throws Exception {
return keySelector.getKey(input);
}
}
更新
我创建了以下 class 但我看到了一个错误。我认为它无法序列化 class MapStreamBundleOperator.java
public class MapStreamBundleOperator<K, V, IN, OUT> extends
AbstractMapStreamBundleOperator<K, V, IN, OUT> {
private static final long serialVersionUID = 6556268125924098320L;
/** KeySelector is used to extract key for bundle map. */
private final KeySelector<IN, K> keySelector;
public MapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger,
KeySelector<IN, K> keySelector) {
super(function, bundleTrigger);
this.keySelector = keySelector;
}
@Override
protected K getKey(IN input) throws Exception {
return this.keySelector.getKey(input);
}
}
`
2021-08-27 05:06:04,814 ERROR FlinkDefaults.class - Stream execution failed
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:247)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:497)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:318)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:297)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:297)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:264)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:173)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:62)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1810)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
at com.pinterest.xenon.flink.FlinkDefaults$.run(FlinkDefaults.scala:46)
at com.pinterest.xenon.flink.FlinkWorkflow.run(FlinkWorkflow.scala:74)
at com.pinterest.xenon.flink.WorkflowLauncher$.executeWorkflow(WorkflowLauncher.scala:43)
at com.pinterest.xenon.flink.WorkflowLauncher$.delayedEndpoint$com$pinterest$xenon$flink$WorkflowLauncher(WorkflowLauncher.scala:25)
at com.pinterest.xenon.flink.WorkflowLauncher$delayedInit$body.apply(WorkflowLauncher.scala:9)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.pinterest.xenon.flink.WorkflowLauncher$.main(WorkflowLauncher.scala:9)
at com.pinterest.xenon.flink.WorkflowLauncher.main(WorkflowLauncher.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:168)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.io.NotSerializableException: visibility.mabs.src.main.java.com.pinterest.mabs.MabsFlinkJob
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
`
我不会依赖官方 MapBundleOperator
因为大卫已经说过这没有很好的记录。我会根据自己的AbstractMapStreamBundleOperator
来回答这个问题。我认为您在 processElement()
方法中缺少计数器 numOfElements++;
。而且最好使用泛型类型。使用这个 code:
public abstract class AbstractMapStreamBundleOperator<K, V, IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapBundleFunction<K, V, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BundleTriggerCallback {
private static final long serialVersionUID = 1L;
private final Map<K, V> bundle;
private final BundleTrigger<IN> bundleTrigger;
private transient TimestampedCollector<OUT> collector;
private transient int numOfElements = 0;
public AbstractMapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger) {
super(function);
chainingStrategy = ChainingStrategy.ALWAYS;
this.bundle = new HashMap<>();
this.bundleTrigger = checkNotNull(bundleTrigger, "bundleTrigger is null");
}
@Override
public void open() throws Exception {
super.open();
numOfElements = 0;
collector = new TimestampedCollector<>(output);
bundleTrigger.registerCallback(this);
// reset trigger
bundleTrigger.reset();
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// get the key and value for the map bundle
final IN input = element.getValue();
final K bundleKey = getKey(input);
final V bundleValue = this.bundle.get(bundleKey);
// get a new value after adding this element to bundle
final V newBundleValue = userFunction.addInput(bundleValue, input);
// update to map bundle
bundle.put(bundleKey, newBundleValue);
numOfElements++;
bundleTrigger.onElement(input);
}
protected abstract K getKey(final IN input) throws Exception;
@Override
public void finishBundle() throws Exception {
if (!bundle.isEmpty()) {
numOfElements = 0;
userFunction.finishBundle(bundle, collector);
bundle.clear();
}
bundleTrigger.reset();
}
}
然后像您已有的那样创建 MapStreamBundleOperator
。使用这个 code:
public class MapStreamBundleOperator<K, V, IN, OUT> extends AbstractMapStreamBundleOperator<K, V, IN, OUT> {
private final KeySelector<IN, K> keySelector;
public MapStreamBundleOperator(MapBundleFunction<K, V, IN, OUT> function, BundleTrigger<IN> bundleTrigger,
KeySelector<IN, K> keySelector) {
super(function, bundleTrigger);
this.keySelector = keySelector;
}
@Override
protected K getKey(IN input) throws Exception {
return this.keySelector.getKey(input);
}
}
触发器内的计数器使 Bundle 运算符将事件刷新到下一阶段。 CountBundleTrigger
如下所示。使用这个 code. You will need also the BundleTriggerCallback.
public class CountBundleTrigger<T> implements BundleTrigger<T> {
private final long maxCount;
private transient BundleTriggerCallback callback;
private transient long count = 0;
public CountBundleTrigger(long maxCount) {
Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
this.maxCount = maxCount;
}
@Override
public void registerCallback(BundleTriggerCallback callback) {
this.callback = Preconditions.checkNotNull(callback, "callback is null");
}
@Override
public void onElement(T element) throws Exception {
count++;
if (count >= maxCount) {
callback.finishBundle();
reset();
}
}
@Override
public void reset() { count = 0; }
@Override
public String explain() {
return "CountBundleTrigger with size " + maxCount;
}
}
然后你必须创建一个这个触发器来传递你的操作员。我在这里创建了一组 100
TaxiFare
事件。取this example with another POJO. I wrote the MapBundleTaxiFareImpl
here but you can create your UDF based on this one.
private OneInputStreamOperator<Tuple2<Long, TaxiFare>, Tuple2<Long, TaxiFare>> getPreAggOperator() {
MapBundleFunction<Long, TaxiFare, Tuple2<Long, TaxiFare>, Tuple2<Long, TaxiFare>> myMapBundleFunction = new MapBundleTaxiFareImpl();
CountBundleTrigger<Tuple2<Long, TaxiFare>> bundleTrigger = new CountBundleTrigger<Tuple2<Long, TaxiFare>>(100);
return new MapStreamBundleOperator<>(myMapBundleFunction, bundleTrigger, keyBundleSelector);
}
最后,您在某处使用 transform()
调用了这个新运算符。拿this example with another POJO.
stream
...
.transform("my-pre-agg",
TypeInformation.of(new TypeHint<Tuple2<Long, TaxiFare>>(){}), getPreAggOperator())
...
我认为这就是您所需要的。尝试使用那些 class ,如果它缺少某些东西,它可能在我放置链接的 gitrepository 上。我希望你能成功。