Flink AggregateFunction 通过多个键求和(验证过程和测试)

Flink AggregateFunction find sum by multiple keys( validation process and testing)

我在 Kinesis Data Analytics 上使用 Apache flink。

弗林克版本:1.13.2 爪哇语:1.11

我正在使用来自 kafka 的 json 消息。示例输入记录如下所示

null    {"plateNumber":"506b9910-74a7-4c3e-a885-b5e9717efe3a","vignetteStickerId":"9e69df3f-d728-4fc8-9b09-42104588f772","currentTimestamp":"2022/04/07 16:19:55","timestamp":1649362795.444459000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null    {"plateNumber":"5ffe0326-571e-4b97-8f7b-4f49aebb6993","vignetteStickerId":"6c2e1342-b096-4cc9-a92c-df61571c2c7d","currentTimestamp":"2022/04/07 16:20:00","timestamp":1649362800.638060000,"vehicleType":"CAR","vehicleModelType":"HONDA"}
null    {"plateNumber":"d15f49f9-5550-4780-b260-83f3116ba64a","vignetteStickerId":"1366fbfe-7d0a-475f-9249-261ef1dd6de2","currentTimestamp":"2022/04/07 16:20:05","timestamp":1649362805.643749000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null    {"plateNumber":"803508fb-9701-438e-9028-01bb8d96a804","vignetteStickerId":"b534369f-533e-4c15-ac3f-fc28cf0f3aba","currentTimestamp":"2022/04/07 16:20:10","timestamp":1649362810.648813000,"vehicleType":"CAR","vehicleModelType":"FORD"}

我想使用 vehicleType(汽车或卡车)和 vehicleModelType(丰田、本田或福特)。 SQL 类比 (sum() ,Group by vehicleType, vehicleModelType)

我正在使用聚合函数来实现这一点。

import static java.util.Objects.isNull;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.springframework.stereotype.Component;

import com.helecloud.streams.demo.model.Vehicle;
import com.helecloud.streams.demo.model.VehicleStatistics;

@Component
public class VehicleStatisticsAggregator implements AggregateFunction<Vehicle, VehicleStatistics, VehicleStatistics> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public VehicleStatistics createAccumulator() {
        System.out.println("Creating Accumulator!!");
        return new VehicleStatistics();
    }

    @Override
    public VehicleStatistics add(Vehicle vehicle, VehicleStatistics vehicleStatistics) {

        System.out.println("vehicle in add method : " + vehicle);

        if (isNull(vehicleStatistics.getVehicleType())) {
            vehicleStatistics.setVehicleType(vehicle.getVehicleType());
        }

        if (isNull(vehicleStatistics.getVehicleModelType())) {
            vehicleStatistics.setVehicleModelType(vehicle.getVehicleModelType());
        }

//    if(isNull(vehicleStatistics.getStart())) {
//
//      vehicleStatistics.setStart(vehicle.getTimestamp());
//    }

//    if(isNull(vehicleStatistics.getCurrentTimestamp())) {
//
//        vehicleStatistics.setCurrentTimestamp(vehicle.getCurrentTimestamp());
//      }

        if (isNull(vehicleStatistics.getCount())) {

            vehicleStatistics.setCount(1);
        } else {

            System.out.println("incrementing count for : vehicleStatistics : " + vehicleStatistics);
            vehicleStatistics.setCount(vehicleStatistics.getCount() + 1);
        }

        vehicleStatistics.setEnd(vehicle.getTimestamp());

        System.out.println("vehicleStatistics in add : " + vehicleStatistics);

        return vehicleStatistics;
    }

    @Override
    public VehicleStatistics getResult(VehicleStatistics vehicleStatistics) {
        System.out.println("vehicleStatistics in getResult : " + vehicleStatistics);
        return vehicleStatistics;
    }

    @Override
    public VehicleStatistics merge(VehicleStatistics vehicleStatistics, VehicleStatistics accumulator) {

        System.out.println("Coming to merge!!");

        VehicleStatistics vs = new VehicleStatistics(
                // vehicleStatistics.getStart(),
                accumulator.getEnd(),
                // vehicleStatistics.getCurrentTimestamp(),
                vehicleStatistics.getVehicleType(), vehicleStatistics.getVehicleModelType(),
                vehicleStatistics.getCount() + accumulator.getCount());

        System.out.println("VehicleStatistics in Merge :" + vs);

        return vs;

    }
}

在上面的代码中,我也没有看到正在调用的合并代码。 下面是主要处理代码

@Service
public class ProcessingService {

  @Value("${kafka.bootstrap-servers}")
  private String kafkaAddress;

  @Value("${kafka.group-id}")
  private String kafkaGroupId;

  public static final String TOPIC = "flink_input";

  public static final String VEHICLE_STATISTICS_TOPIC = "flink_output";

  @Autowired
  private  VehicleDeserializationSchema vehicleDeserializationSchema;

  @Autowired
  private  VehicleStatisticsSerializationSchema vehicleStatisticsSerializationSchema;

  @PostConstruct
  public void startFlinkStreamProcessing() {
    try {

      processVehicleStatistic();
    } catch (Exception e) {

     // log.error("Cannot process", e);
        e.printStackTrace();
    }
  }

  public void processVehicleStatistic()  {
      
      
    try {
        
         StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

            FlinkKafkaConsumer<Vehicle> consumer = createVehicleConsumerForTopic(TOPIC, kafkaAddress, kafkaGroupId);

            consumer.setStartFromLatest();
            
            System.out.println("Starting to consume!!");

            consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

            FlinkKafkaProducer<VehicleStatistics> producer = createVehicleStatisticsProducer(VEHICLE_STATISTICS_TOPIC, kafkaAddress);

            DataStream<Vehicle> inputMessagesStream = environment.addSource(consumer);
            
            

            inputMessagesStream
            .keyBy((vehicle -> vehicle.getVehicleType().ordinal()))
          //  .keyBy(vehicle -> vehicle.getVehicleModelType().ordinal())
//              .keyBy(new KeySelector<Vehicle, Tuple2<VehicleType, VehicleModelType>>() {
//      
//                  /**
//                   * 
//                   */
//                  private static final long serialVersionUID = 1L;
//      
//                  @Override
//                  public Tuple2<VehicleType, VehicleModelType> getKey(Vehicle vehicle) throws Exception {
//                    return Tuple2.of(vehicle.getVehicleType(), vehicle.getVehicleModelType());
//                  }
//                })
               
//              .filter(v -> CAR.equals(v.getVehicleType()))
                .window(TumblingEventTimeWindows.of(Time.seconds(20)))
//              .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new VehicleStatisticsAggregator())
             
            .addSink(producer);
            
            System.out.println("Adding to Sink!!");

            environment.execute("Car Truck Counts By Model");

        
    } catch(Exception e) {
        e.printStackTrace();;
    }

  }

  private FlinkKafkaConsumer<Vehicle> createVehicleConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup ) {

    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", kafkaAddress);

    properties.setProperty("group.id", kafkaGroup);

    return new FlinkKafkaConsumer<>(topic, vehicleDeserializationSchema, properties);

  }

  private FlinkKafkaProducer<VehicleStatistics> createVehicleStatisticsProducer(String topic, String kafkaAddress){

    return new FlinkKafkaProducer<>(kafkaAddress, topic, vehicleStatisticsSerializationSchema);
  }

}

我得到的结果如下。

null    {"end":1649362835.665466000,"vehicleType":"TRUCK","vehicleModelType":"HONDA","count":3}
null    {"end":1649362825.656024000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":1}
null    {"end":1649362850.675786000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":3}
null    {"end":1649362855.677596000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA","count":1}

但是有没有办法验证这一点?

另外一个问题是我正在尝试基于多个键聚合结果是 AggregateFunction 正确的方法。

我看到这个就问这个How can I sum multiple fields in Flink?

所以如果我必须在多个字段上聚合总和,聚合函数可以完成同样的事情吗?(我写代码的方式)

请告诉我。提前致谢。

Merge 只有在您使用 windows 合并时才会被调用——换句话说,如果您使用会话 windows 或自定义合并 window.

基于多个键聚合的正确方法是使用keyBy和复合类型,例如Tuple2<VehicleType, VehicleModelType>>。每次调用 keyBy 时,流都会从头开始重新分区(而不是在之前的任何分区之外)。