Flink AggregateFunction 通过多个键求和(验证过程和测试)
Flink AggregateFunction find sum by multiple keys( validation process and testing)
我在 Kinesis Data Analytics 上使用 Apache flink。
弗林克版本:1.13.2
爪哇语:1.11
我正在使用来自 kafka 的 json 消息。示例输入记录如下所示
null {"plateNumber":"506b9910-74a7-4c3e-a885-b5e9717efe3a","vignetteStickerId":"9e69df3f-d728-4fc8-9b09-42104588f772","currentTimestamp":"2022/04/07 16:19:55","timestamp":1649362795.444459000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null {"plateNumber":"5ffe0326-571e-4b97-8f7b-4f49aebb6993","vignetteStickerId":"6c2e1342-b096-4cc9-a92c-df61571c2c7d","currentTimestamp":"2022/04/07 16:20:00","timestamp":1649362800.638060000,"vehicleType":"CAR","vehicleModelType":"HONDA"}
null {"plateNumber":"d15f49f9-5550-4780-b260-83f3116ba64a","vignetteStickerId":"1366fbfe-7d0a-475f-9249-261ef1dd6de2","currentTimestamp":"2022/04/07 16:20:05","timestamp":1649362805.643749000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null {"plateNumber":"803508fb-9701-438e-9028-01bb8d96a804","vignetteStickerId":"b534369f-533e-4c15-ac3f-fc28cf0f3aba","currentTimestamp":"2022/04/07 16:20:10","timestamp":1649362810.648813000,"vehicleType":"CAR","vehicleModelType":"FORD"}
我想使用 vehicleType(汽车或卡车)和 vehicleModelType(丰田、本田或福特)。 SQL 类比 (sum() ,Group by vehicleType, vehicleModelType)
我正在使用聚合函数来实现这一点。
import static java.util.Objects.isNull;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.springframework.stereotype.Component;
import com.helecloud.streams.demo.model.Vehicle;
import com.helecloud.streams.demo.model.VehicleStatistics;
@Component
public class VehicleStatisticsAggregator implements AggregateFunction<Vehicle, VehicleStatistics, VehicleStatistics> {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public VehicleStatistics createAccumulator() {
System.out.println("Creating Accumulator!!");
return new VehicleStatistics();
}
@Override
public VehicleStatistics add(Vehicle vehicle, VehicleStatistics vehicleStatistics) {
System.out.println("vehicle in add method : " + vehicle);
if (isNull(vehicleStatistics.getVehicleType())) {
vehicleStatistics.setVehicleType(vehicle.getVehicleType());
}
if (isNull(vehicleStatistics.getVehicleModelType())) {
vehicleStatistics.setVehicleModelType(vehicle.getVehicleModelType());
}
// if(isNull(vehicleStatistics.getStart())) {
//
// vehicleStatistics.setStart(vehicle.getTimestamp());
// }
// if(isNull(vehicleStatistics.getCurrentTimestamp())) {
//
// vehicleStatistics.setCurrentTimestamp(vehicle.getCurrentTimestamp());
// }
if (isNull(vehicleStatistics.getCount())) {
vehicleStatistics.setCount(1);
} else {
System.out.println("incrementing count for : vehicleStatistics : " + vehicleStatistics);
vehicleStatistics.setCount(vehicleStatistics.getCount() + 1);
}
vehicleStatistics.setEnd(vehicle.getTimestamp());
System.out.println("vehicleStatistics in add : " + vehicleStatistics);
return vehicleStatistics;
}
@Override
public VehicleStatistics getResult(VehicleStatistics vehicleStatistics) {
System.out.println("vehicleStatistics in getResult : " + vehicleStatistics);
return vehicleStatistics;
}
@Override
public VehicleStatistics merge(VehicleStatistics vehicleStatistics, VehicleStatistics accumulator) {
System.out.println("Coming to merge!!");
VehicleStatistics vs = new VehicleStatistics(
// vehicleStatistics.getStart(),
accumulator.getEnd(),
// vehicleStatistics.getCurrentTimestamp(),
vehicleStatistics.getVehicleType(), vehicleStatistics.getVehicleModelType(),
vehicleStatistics.getCount() + accumulator.getCount());
System.out.println("VehicleStatistics in Merge :" + vs);
return vs;
}
}
在上面的代码中,我也没有看到正在调用的合并代码。
下面是主要处理代码
@Service
public class ProcessingService {
@Value("${kafka.bootstrap-servers}")
private String kafkaAddress;
@Value("${kafka.group-id}")
private String kafkaGroupId;
public static final String TOPIC = "flink_input";
public static final String VEHICLE_STATISTICS_TOPIC = "flink_output";
@Autowired
private VehicleDeserializationSchema vehicleDeserializationSchema;
@Autowired
private VehicleStatisticsSerializationSchema vehicleStatisticsSerializationSchema;
@PostConstruct
public void startFlinkStreamProcessing() {
try {
processVehicleStatistic();
} catch (Exception e) {
// log.error("Cannot process", e);
e.printStackTrace();
}
}
public void processVehicleStatistic() {
try {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<Vehicle> consumer = createVehicleConsumerForTopic(TOPIC, kafkaAddress, kafkaGroupId);
consumer.setStartFromLatest();
System.out.println("Starting to consume!!");
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
FlinkKafkaProducer<VehicleStatistics> producer = createVehicleStatisticsProducer(VEHICLE_STATISTICS_TOPIC, kafkaAddress);
DataStream<Vehicle> inputMessagesStream = environment.addSource(consumer);
inputMessagesStream
.keyBy((vehicle -> vehicle.getVehicleType().ordinal()))
// .keyBy(vehicle -> vehicle.getVehicleModelType().ordinal())
// .keyBy(new KeySelector<Vehicle, Tuple2<VehicleType, VehicleModelType>>() {
//
// /**
// *
// */
// private static final long serialVersionUID = 1L;
//
// @Override
// public Tuple2<VehicleType, VehicleModelType> getKey(Vehicle vehicle) throws Exception {
// return Tuple2.of(vehicle.getVehicleType(), vehicle.getVehicleModelType());
// }
// })
// .filter(v -> CAR.equals(v.getVehicleType()))
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
// .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new VehicleStatisticsAggregator())
.addSink(producer);
System.out.println("Adding to Sink!!");
environment.execute("Car Truck Counts By Model");
} catch(Exception e) {
e.printStackTrace();;
}
}
private FlinkKafkaConsumer<Vehicle> createVehicleConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup ) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id", kafkaGroup);
return new FlinkKafkaConsumer<>(topic, vehicleDeserializationSchema, properties);
}
private FlinkKafkaProducer<VehicleStatistics> createVehicleStatisticsProducer(String topic, String kafkaAddress){
return new FlinkKafkaProducer<>(kafkaAddress, topic, vehicleStatisticsSerializationSchema);
}
}
我得到的结果如下。
null {"end":1649362835.665466000,"vehicleType":"TRUCK","vehicleModelType":"HONDA","count":3}
null {"end":1649362825.656024000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":1}
null {"end":1649362850.675786000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":3}
null {"end":1649362855.677596000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA","count":1}
但是有没有办法验证这一点?
另外一个问题是我正在尝试基于多个键聚合结果是 AggregateFunction 正确的方法。
我看到这个就问这个How can I sum multiple fields in Flink?
所以如果我必须在多个字段上聚合总和,聚合函数可以完成同样的事情吗?(我写代码的方式)
请告诉我。提前致谢。
Merge 只有在您使用 windows 合并时才会被调用——换句话说,如果您使用会话 windows 或自定义合并 window.
基于多个键聚合的正确方法是使用keyBy
和复合类型,例如Tuple2<VehicleType, VehicleModelType>>
。每次调用 keyBy
时,流都会从头开始重新分区(而不是在之前的任何分区之外)。
我在 Kinesis Data Analytics 上使用 Apache flink。
弗林克版本:1.13.2 爪哇语:1.11
我正在使用来自 kafka 的 json 消息。示例输入记录如下所示
null {"plateNumber":"506b9910-74a7-4c3e-a885-b5e9717efe3a","vignetteStickerId":"9e69df3f-d728-4fc8-9b09-42104588f772","currentTimestamp":"2022/04/07 16:19:55","timestamp":1649362795.444459000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null {"plateNumber":"5ffe0326-571e-4b97-8f7b-4f49aebb6993","vignetteStickerId":"6c2e1342-b096-4cc9-a92c-df61571c2c7d","currentTimestamp":"2022/04/07 16:20:00","timestamp":1649362800.638060000,"vehicleType":"CAR","vehicleModelType":"HONDA"}
null {"plateNumber":"d15f49f9-5550-4780-b260-83f3116ba64a","vignetteStickerId":"1366fbfe-7d0a-475f-9249-261ef1dd6de2","currentTimestamp":"2022/04/07 16:20:05","timestamp":1649362805.643749000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null {"plateNumber":"803508fb-9701-438e-9028-01bb8d96a804","vignetteStickerId":"b534369f-533e-4c15-ac3f-fc28cf0f3aba","currentTimestamp":"2022/04/07 16:20:10","timestamp":1649362810.648813000,"vehicleType":"CAR","vehicleModelType":"FORD"}
我想使用 vehicleType(汽车或卡车)和 vehicleModelType(丰田、本田或福特)。 SQL 类比 (sum() ,Group by vehicleType, vehicleModelType)
我正在使用聚合函数来实现这一点。
import static java.util.Objects.isNull;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.springframework.stereotype.Component;
import com.helecloud.streams.demo.model.Vehicle;
import com.helecloud.streams.demo.model.VehicleStatistics;
@Component
public class VehicleStatisticsAggregator implements AggregateFunction<Vehicle, VehicleStatistics, VehicleStatistics> {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public VehicleStatistics createAccumulator() {
System.out.println("Creating Accumulator!!");
return new VehicleStatistics();
}
@Override
public VehicleStatistics add(Vehicle vehicle, VehicleStatistics vehicleStatistics) {
System.out.println("vehicle in add method : " + vehicle);
if (isNull(vehicleStatistics.getVehicleType())) {
vehicleStatistics.setVehicleType(vehicle.getVehicleType());
}
if (isNull(vehicleStatistics.getVehicleModelType())) {
vehicleStatistics.setVehicleModelType(vehicle.getVehicleModelType());
}
// if(isNull(vehicleStatistics.getStart())) {
//
// vehicleStatistics.setStart(vehicle.getTimestamp());
// }
// if(isNull(vehicleStatistics.getCurrentTimestamp())) {
//
// vehicleStatistics.setCurrentTimestamp(vehicle.getCurrentTimestamp());
// }
if (isNull(vehicleStatistics.getCount())) {
vehicleStatistics.setCount(1);
} else {
System.out.println("incrementing count for : vehicleStatistics : " + vehicleStatistics);
vehicleStatistics.setCount(vehicleStatistics.getCount() + 1);
}
vehicleStatistics.setEnd(vehicle.getTimestamp());
System.out.println("vehicleStatistics in add : " + vehicleStatistics);
return vehicleStatistics;
}
@Override
public VehicleStatistics getResult(VehicleStatistics vehicleStatistics) {
System.out.println("vehicleStatistics in getResult : " + vehicleStatistics);
return vehicleStatistics;
}
@Override
public VehicleStatistics merge(VehicleStatistics vehicleStatistics, VehicleStatistics accumulator) {
System.out.println("Coming to merge!!");
VehicleStatistics vs = new VehicleStatistics(
// vehicleStatistics.getStart(),
accumulator.getEnd(),
// vehicleStatistics.getCurrentTimestamp(),
vehicleStatistics.getVehicleType(), vehicleStatistics.getVehicleModelType(),
vehicleStatistics.getCount() + accumulator.getCount());
System.out.println("VehicleStatistics in Merge :" + vs);
return vs;
}
}
在上面的代码中,我也没有看到正在调用的合并代码。 下面是主要处理代码
@Service
public class ProcessingService {
@Value("${kafka.bootstrap-servers}")
private String kafkaAddress;
@Value("${kafka.group-id}")
private String kafkaGroupId;
public static final String TOPIC = "flink_input";
public static final String VEHICLE_STATISTICS_TOPIC = "flink_output";
@Autowired
private VehicleDeserializationSchema vehicleDeserializationSchema;
@Autowired
private VehicleStatisticsSerializationSchema vehicleStatisticsSerializationSchema;
@PostConstruct
public void startFlinkStreamProcessing() {
try {
processVehicleStatistic();
} catch (Exception e) {
// log.error("Cannot process", e);
e.printStackTrace();
}
}
public void processVehicleStatistic() {
try {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<Vehicle> consumer = createVehicleConsumerForTopic(TOPIC, kafkaAddress, kafkaGroupId);
consumer.setStartFromLatest();
System.out.println("Starting to consume!!");
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
FlinkKafkaProducer<VehicleStatistics> producer = createVehicleStatisticsProducer(VEHICLE_STATISTICS_TOPIC, kafkaAddress);
DataStream<Vehicle> inputMessagesStream = environment.addSource(consumer);
inputMessagesStream
.keyBy((vehicle -> vehicle.getVehicleType().ordinal()))
// .keyBy(vehicle -> vehicle.getVehicleModelType().ordinal())
// .keyBy(new KeySelector<Vehicle, Tuple2<VehicleType, VehicleModelType>>() {
//
// /**
// *
// */
// private static final long serialVersionUID = 1L;
//
// @Override
// public Tuple2<VehicleType, VehicleModelType> getKey(Vehicle vehicle) throws Exception {
// return Tuple2.of(vehicle.getVehicleType(), vehicle.getVehicleModelType());
// }
// })
// .filter(v -> CAR.equals(v.getVehicleType()))
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
// .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new VehicleStatisticsAggregator())
.addSink(producer);
System.out.println("Adding to Sink!!");
environment.execute("Car Truck Counts By Model");
} catch(Exception e) {
e.printStackTrace();;
}
}
private FlinkKafkaConsumer<Vehicle> createVehicleConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup ) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id", kafkaGroup);
return new FlinkKafkaConsumer<>(topic, vehicleDeserializationSchema, properties);
}
private FlinkKafkaProducer<VehicleStatistics> createVehicleStatisticsProducer(String topic, String kafkaAddress){
return new FlinkKafkaProducer<>(kafkaAddress, topic, vehicleStatisticsSerializationSchema);
}
}
我得到的结果如下。
null {"end":1649362835.665466000,"vehicleType":"TRUCK","vehicleModelType":"HONDA","count":3}
null {"end":1649362825.656024000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":1}
null {"end":1649362850.675786000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":3}
null {"end":1649362855.677596000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA","count":1}
但是有没有办法验证这一点?
另外一个问题是我正在尝试基于多个键聚合结果是 AggregateFunction 正确的方法。
我看到这个就问这个How can I sum multiple fields in Flink?
所以如果我必须在多个字段上聚合总和,聚合函数可以完成同样的事情吗?(我写代码的方式)
请告诉我。提前致谢。
Merge 只有在您使用 windows 合并时才会被调用——换句话说,如果您使用会话 windows 或自定义合并 window.
基于多个键聚合的正确方法是使用keyBy
和复合类型,例如Tuple2<VehicleType, VehicleModelType>>
。每次调用 keyBy
时,流都会从头开始重新分区(而不是在之前的任何分区之外)。