FLINK CEP (Java 8) - 通过匹配模式持久"identity"
FLINK CEP (Java 8) - persistent "identity" through matching pattern
我正在尝试使用 FLINK-CEP 来测量市场中的出价从 BidState.OPEN
到 BidState.Closed
所花费的时间。我收到了带有 ID 和状态的出价数据流,目前我正在将所有 "OPENED" 出价与所有 "CLOSED" 出价进行匹配。
我在 patternStream.process
中有一个条件,它只允许将具有相同 ID 的开始和结束出价配对,这是应该的。但这感觉不对,因为匹配的数量以这种方式增长得非常快,而且我觉得这可以通过模式来完成。那么,有没有办法确保 "start" 和 "end" 对象具有相同的 ID?
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
new SimpleCondition<BidEvent>() {
@Override
public boolean filter(BidEvent value) {
return value.getState() == BidState.OPENED;
}
}).followedByAny("end").where(
new SimpleCondition<BidEvent>() {
@Override
public boolean filter(BidEvent value) throws Exception {
return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
}
}).within(timeout);
PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);
patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
@Override
public void processMatch(Map<String
, List<BidEvent>> map
, Context context
, Collector<MatchingDuration> collector) {
BidEvent start = map.get("start").get(0);
BidEvent end = map.get("end").get(0);
if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
}
}
}).addSink(matchingDurationSinkFunction);
我想出了如何获得我想要的行为:BidEventDataStream
必须键入 才能进行模式匹配在具有相同键的对象上。问题中的代码无需更改,但是必须编辑 BidEventDataStream
以捕获 BidEvent.getBidId()
:
BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
@Override
public Long getKey(BidEventvalue) {
return value.getBidId();
}
})
我正在尝试使用 FLINK-CEP 来测量市场中的出价从 BidState.OPEN
到 BidState.Closed
所花费的时间。我收到了带有 ID 和状态的出价数据流,目前我正在将所有 "OPENED" 出价与所有 "CLOSED" 出价进行匹配。
我在 patternStream.process
中有一个条件,它只允许将具有相同 ID 的开始和结束出价配对,这是应该的。但这感觉不对,因为匹配的数量以这种方式增长得非常快,而且我觉得这可以通过模式来完成。那么,有没有办法确保 "start" 和 "end" 对象具有相同的 ID?
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
new SimpleCondition<BidEvent>() {
@Override
public boolean filter(BidEvent value) {
return value.getState() == BidState.OPENED;
}
}).followedByAny("end").where(
new SimpleCondition<BidEvent>() {
@Override
public boolean filter(BidEvent value) throws Exception {
return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
}
}).within(timeout);
PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);
patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
@Override
public void processMatch(Map<String
, List<BidEvent>> map
, Context context
, Collector<MatchingDuration> collector) {
BidEvent start = map.get("start").get(0);
BidEvent end = map.get("end").get(0);
if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
}
}
}).addSink(matchingDurationSinkFunction);
我想出了如何获得我想要的行为:BidEventDataStream
必须键入 才能进行模式匹配在具有相同键的对象上。问题中的代码无需更改,但是必须编辑 BidEventDataStream
以捕获 BidEvent.getBidId()
:
BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
@Override
public Long getKey(BidEventvalue) {
return value.getBidId();
}
})