为什么上次匹配事件的摄取时间与 CEP 匹配和触发模式的时间之间的时间如此之长?
Why the time between ingestion time of last matched event and time of CEP match and fire the pattern are so long?
我写了一个非常简单的 CEP Pattern,它只匹配三个序列后者 'abc',但是即使是这么简单的模式,Flink(同时测试 1.5 和 1.4.2)也花了将近 1 秒(有时大约 0.5 秒)在最后一个匹配事件的摄取时间和 CEP 匹配模式并触发的时间之间。
以下是测试结果,请注意最后两个字段ingestionTimestamp和timestamp是最后一个匹配事件的摄取时间和CEP触发匹配模式的时间戳。
我的问题是如何提高性能?是否可以设置 CEP 的 setBufferTimeout 之类的东西?我尝试将 BufferTimeout 设置为 5ms,但没有成功。
测试结果:
3> Transport{prodId=411, from='a', to='b', ingestionTimestamp='1528384356501', timestamp='1528384357034'} Transport{prodId=411, from='b', to='c', ingestionTimestamp='1528384356502', timestamp='1528384357034'} Transport{prodId=411, from='c', to='d', ingestionTimestamp='1528384356505', timestamp='1528384357034'}
3> Transport{prodId=415, from='a', to='b', ingestionTimestamp='1528384356530', timestamp='1528384357034'} Transport{prodId=415, from='b', to='c', ingestionTimestamp='1528384356532', timestamp='1528384357034'} Transport{prodId=415, from='c', to='d', ingestionTimestamp='1528384356534', timestamp='1528384357034'}
3> Transport{prodId=419, from='a', to='b', ingestionTimestamp='1528384356549', timestamp='1528384357034'} Transport{prodId=419, from='b', to='c', ingestionTimestamp='1528384356549', timestamp='1528384357034'} Transport{prodId=419, from='c', to='d', ingestionTimestamp='1528384356554', timestamp='1528384357034'}
代码如下:
public class RetailerExampleKafka {
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
// private static final String RIDE_SPEED_GROUP = "rideSpeedGroup";
/**
* The initial source of our shipment.
*/
private static final String SRC = "a";
private static final Pattern<Transport, ?> pattern = Pattern.<Transport>begin("start")
.where(new SimpleCondition<Transport>() {
private static final long serialVersionUID = 314415972814127035L;
@Override
public boolean filter(Transport value) throws Exception {
return Objects.equals(value.getFrom(), SRC);
}
}).next("middle").where(new SimpleCondition<Transport>() {
private static final long serialVersionUID = 6664468385615273240L;
@Override
public boolean filter(Transport value) {
return value.getFrom().startsWith("b");
}
}).next("end").where(new SimpleCondition<Transport>() {
private static final long serialVersionUID = 5721311694340771858L;
@Override
public boolean filter(Transport value) {
return value.getFrom().startsWith("c");
}
}).within(Time.milliseconds(5000));
public static void main(String[] args) throws Exception {
//List<Transport> sampleData = new ArrayList<>();
//sampleData.add(new Transport(1, "a", "b", 0L));
//sampleData.add(new Transport(1, "b", "c", 1L));
//sampleData.add(new Transport(1, "c", "d", 2L));
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(5);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.getConfig().disableSysoutLogging();
ParameterTool params = ParameterTool.fromArgs(args);
String inputTopic = params.getRequired("input-topic");
String groupID = params.getRequired("group-id");
Long slide = Long.parseLong(params.getRequired("slide").trim());
final int popThreshold = 1; // threshold for popular places
env.getConfig().setAutoWatermarkInterval(1000);
Properties kafkaProps = new Properties();
//kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", "g111");
kafkaProps.setProperty("auto.offset.reset", "earliest");
// create a Kafka consumer
FlinkKafkaConsumer011<Transport> consumer = new FlinkKafkaConsumer011<>(
inputTopic,
new TransportSchema(),
kafkaProps);
DataStream<Transport> rides = env.addSource(consumer)
.keyBy(element -> element.getProductId())
.process(new MatchFunction2());
CEP.pattern(rides, pattern).flatSelect(new PatternFlatSelectFunction<Transport, String>() {
private static final long serialVersionUID = -8972838879934875538L;
@Override
public void flatSelect(Map<String, List<Transport>> map, Collector<String> collector) throws Exception {
StringBuilder str = new StringBuilder();
for (Map.Entry<String, List<Transport>> entry : map.entrySet()) {
for (Transport t : entry.getValue()) {
t.timestamp = System.currentTimeMillis();
str.append(t + " ");
}
}
collector.collect(str.toString());
}
}).print();
env.execute();
}
/**
* Our input records. Each contains:
* 1. the id of the product,
* 2. the starting location of the shipment, and
* 3. the final location of the shipment.
*/
public static class Transport {
private final int prodId;
private final String from;
private final String to;
private long timestamp;
public long ingestionTimestamp;
public Transport(int productId, String from, String to, long timestamp) {
this.prodId = productId;
this.from = from;
this.to = to;
this.timestamp = timestamp;
}
public int getProductId() {
return prodId;
}
public String getFrom() {
return from;
}
public String getTo() {
return to;
}
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "Transport{" +
"prodId=" + prodId +
", from='" + from + '\'' +
", to='" + to + '\'' +
", ingestionTimestamp='" + ingestionTimestamp + '\'' +
", timestamp='" + timestamp + '\'' +
'}';
}
public static Transport fromString(String line) {
String[] split = line.split(",");
Transport transport = new Transport(Integer.valueOf(split[0]), split[1], split[2], Long.valueOf(split[3]));
return transport;
}
}
private static Tuple2<String, Integer> getLastDestinationAndStopCountForPattern(IterativeCondition.Context<Transport> ctx, String patternName) {
return getLastDestinationAndStopCountForPattern(ctx.getEventsForPattern(patternName));
}
private static Tuple2<String, Integer> getLastDestinationAndStopCountForPattern(Iterable<Transport> events) {
Tuple2<String, Integer> locationAndStopCount = new Tuple2<>("", 0);
for (Transport transport : events) {
locationAndStopCount.f0 = transport.getTo();
locationAndStopCount.f1++;
}
return locationAndStopCount;
}
public static class MatchFunction2 extends ProcessFunction<Transport, Transport> {
@Override
public void open(Configuration config) {
}
@Override
public void processElement(Transport ride, Context context, Collector<Transport> out) throws Exception {
ride.ingestionTimestamp = context.timestamp();
out.collect(ride);
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Transport> out) throws Exception {
}
}
}
这是因为 IngestionTime
像 EventTime
一样自动生成时间戳和水印。时间戳从系统时间分配,水印每隔watermarkInterval
.
产生
在您的情况下,您设置了 .setAutoWatermarkInterval(1000);
,因此水印每秒生成一次。因为在 CEP 库的情况下,事件的顺序至关重要,所以它们在 Watermark 到达时进行排序,然后才进行处理。因此相差 1 秒。
我写了一个非常简单的 CEP Pattern,它只匹配三个序列后者 'abc',但是即使是这么简单的模式,Flink(同时测试 1.5 和 1.4.2)也花了将近 1 秒(有时大约 0.5 秒)在最后一个匹配事件的摄取时间和 CEP 匹配模式并触发的时间之间。
以下是测试结果,请注意最后两个字段ingestionTimestamp和timestamp是最后一个匹配事件的摄取时间和CEP触发匹配模式的时间戳。
我的问题是如何提高性能?是否可以设置 CEP 的 setBufferTimeout 之类的东西?我尝试将 BufferTimeout 设置为 5ms,但没有成功。
测试结果:
3> Transport{prodId=411, from='a', to='b', ingestionTimestamp='1528384356501', timestamp='1528384357034'} Transport{prodId=411, from='b', to='c', ingestionTimestamp='1528384356502', timestamp='1528384357034'} Transport{prodId=411, from='c', to='d', ingestionTimestamp='1528384356505', timestamp='1528384357034'}
3> Transport{prodId=415, from='a', to='b', ingestionTimestamp='1528384356530', timestamp='1528384357034'} Transport{prodId=415, from='b', to='c', ingestionTimestamp='1528384356532', timestamp='1528384357034'} Transport{prodId=415, from='c', to='d', ingestionTimestamp='1528384356534', timestamp='1528384357034'}
3> Transport{prodId=419, from='a', to='b', ingestionTimestamp='1528384356549', timestamp='1528384357034'} Transport{prodId=419, from='b', to='c', ingestionTimestamp='1528384356549', timestamp='1528384357034'} Transport{prodId=419, from='c', to='d', ingestionTimestamp='1528384356554', timestamp='1528384357034'}
代码如下:
public class RetailerExampleKafka {
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
// private static final String RIDE_SPEED_GROUP = "rideSpeedGroup";
/**
* The initial source of our shipment.
*/
private static final String SRC = "a";
private static final Pattern<Transport, ?> pattern = Pattern.<Transport>begin("start")
.where(new SimpleCondition<Transport>() {
private static final long serialVersionUID = 314415972814127035L;
@Override
public boolean filter(Transport value) throws Exception {
return Objects.equals(value.getFrom(), SRC);
}
}).next("middle").where(new SimpleCondition<Transport>() {
private static final long serialVersionUID = 6664468385615273240L;
@Override
public boolean filter(Transport value) {
return value.getFrom().startsWith("b");
}
}).next("end").where(new SimpleCondition<Transport>() {
private static final long serialVersionUID = 5721311694340771858L;
@Override
public boolean filter(Transport value) {
return value.getFrom().startsWith("c");
}
}).within(Time.milliseconds(5000));
public static void main(String[] args) throws Exception {
//List<Transport> sampleData = new ArrayList<>();
//sampleData.add(new Transport(1, "a", "b", 0L));
//sampleData.add(new Transport(1, "b", "c", 1L));
//sampleData.add(new Transport(1, "c", "d", 2L));
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(5);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.getConfig().disableSysoutLogging();
ParameterTool params = ParameterTool.fromArgs(args);
String inputTopic = params.getRequired("input-topic");
String groupID = params.getRequired("group-id");
Long slide = Long.parseLong(params.getRequired("slide").trim());
final int popThreshold = 1; // threshold for popular places
env.getConfig().setAutoWatermarkInterval(1000);
Properties kafkaProps = new Properties();
//kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", "g111");
kafkaProps.setProperty("auto.offset.reset", "earliest");
// create a Kafka consumer
FlinkKafkaConsumer011<Transport> consumer = new FlinkKafkaConsumer011<>(
inputTopic,
new TransportSchema(),
kafkaProps);
DataStream<Transport> rides = env.addSource(consumer)
.keyBy(element -> element.getProductId())
.process(new MatchFunction2());
CEP.pattern(rides, pattern).flatSelect(new PatternFlatSelectFunction<Transport, String>() {
private static final long serialVersionUID = -8972838879934875538L;
@Override
public void flatSelect(Map<String, List<Transport>> map, Collector<String> collector) throws Exception {
StringBuilder str = new StringBuilder();
for (Map.Entry<String, List<Transport>> entry : map.entrySet()) {
for (Transport t : entry.getValue()) {
t.timestamp = System.currentTimeMillis();
str.append(t + " ");
}
}
collector.collect(str.toString());
}
}).print();
env.execute();
}
/**
* Our input records. Each contains:
* 1. the id of the product,
* 2. the starting location of the shipment, and
* 3. the final location of the shipment.
*/
public static class Transport {
private final int prodId;
private final String from;
private final String to;
private long timestamp;
public long ingestionTimestamp;
public Transport(int productId, String from, String to, long timestamp) {
this.prodId = productId;
this.from = from;
this.to = to;
this.timestamp = timestamp;
}
public int getProductId() {
return prodId;
}
public String getFrom() {
return from;
}
public String getTo() {
return to;
}
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "Transport{" +
"prodId=" + prodId +
", from='" + from + '\'' +
", to='" + to + '\'' +
", ingestionTimestamp='" + ingestionTimestamp + '\'' +
", timestamp='" + timestamp + '\'' +
'}';
}
public static Transport fromString(String line) {
String[] split = line.split(",");
Transport transport = new Transport(Integer.valueOf(split[0]), split[1], split[2], Long.valueOf(split[3]));
return transport;
}
}
private static Tuple2<String, Integer> getLastDestinationAndStopCountForPattern(IterativeCondition.Context<Transport> ctx, String patternName) {
return getLastDestinationAndStopCountForPattern(ctx.getEventsForPattern(patternName));
}
private static Tuple2<String, Integer> getLastDestinationAndStopCountForPattern(Iterable<Transport> events) {
Tuple2<String, Integer> locationAndStopCount = new Tuple2<>("", 0);
for (Transport transport : events) {
locationAndStopCount.f0 = transport.getTo();
locationAndStopCount.f1++;
}
return locationAndStopCount;
}
public static class MatchFunction2 extends ProcessFunction<Transport, Transport> {
@Override
public void open(Configuration config) {
}
@Override
public void processElement(Transport ride, Context context, Collector<Transport> out) throws Exception {
ride.ingestionTimestamp = context.timestamp();
out.collect(ride);
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Transport> out) throws Exception {
}
}
}
这是因为 IngestionTime
像 EventTime
一样自动生成时间戳和水印。时间戳从系统时间分配,水印每隔watermarkInterval
.
在您的情况下,您设置了 .setAutoWatermarkInterval(1000);
,因此水印每秒生成一次。因为在 CEP 库的情况下,事件的顺序至关重要,所以它们在 Watermark 到达时进行排序,然后才进行处理。因此相差 1 秒。