Flink 使用 KeyedCoProcessFunction 连接流

Flink connect streams using KeyedCoProcessFunction

对于 1:1 加入,我使用的是 KeyedCoProcessFunction,我有两个流,查找流(每秒 100 条记录)和点击流(每秒 10000 条记录)。在 processElement2 方法中,我正在寻找 MapState<Long,Row> 中的键,如果找到,则用它丰富点击流数据,否则将此记录设置为侧输出,然后下沉侧输出到 kafka。我没有在两个输入流上使用任何 window。对于 kakfa 中的 dlq 主题,我连续看到每秒生成 1-2 条记录,我怎么能以某种方式等待几毫秒在 processElement2 方法中查找 id,然后再将其推送到侧输出。

val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
            .connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
            .process(new EnrichJoinFunction());
public static class EnrichJoinFunction
      extends KeyedCoProcessFunction<Long, Row, Row, Row> {


    final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

    private MapState<Long, Row> map = null;

    @Override
    public void open(Configuration parameters) throws Exception {
      val MapStateDescriptor =
          new MapStateDescriptor<Long, Row>(
              "state",
              TypeInformation.of(Long.class),
              TypeInformation.of(new TypeHint<Row>() {}));
      MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
      /*MapStateDescriptor.setQueryable("test");*/
      map = getRuntimeContext().getMapState(MapStateDescriptor);
    }

    @Override
    public void processElement1(
        Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
      val id = lookupRow.<Long>getFieldAs("id");
      if (!map.contains(id)) {
        map.put(id, lookupRow);
      }
    }

    @Override
    public void processElement2(
        Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

      val id = clickRow.<Long>getFieldAs("id");

      if (map.contains(id)) {
          // enrich join
          val joinRow = join(clickRow, map.get(id));
          out.collect(joinRow);
      } else {
        // lookup entry not yet arrived, send it to side output - dlq
        ctx.output(outputTag, clickRow);
      }
    }

    public Row join(Row clickRow, Row lookupRow) throws ParseException {
      Row joinedRow = new Row(RowKind.INSERT, 13);
      // row setter join ouput
      return joinedRow;
    }
}}

您可以使用 TimerService 来实现。

因此,我们的想法是将没有立即匹配的查找数据的点击流行存储在专用 MapState<Long,Row> 中,并注册 processingTimeTimer/eventTimeTimer 计时器,该计时器将在一段时间后触发。在计时器回调中,您可以尝试在那里加入查找数据和点击流数据。如果再次找不到匹配项,则最后将此点击事件发送到侧输出。

它可能如下所示:

public static class EnrichJoinFunction
      extends KeyedCoProcessFunction<Long, Row, Row, Row> {


    final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

    private MapState<Long, Row> map = null;
    private MapState<Long, Row> clickstreamState = null;

    @Override
    public void open(Configuration parameters) throws Exception {
      MapStateDescriptor<Long, Row> MapStateDescriptor =
          new MapStateDescriptor<Long, Row>(
              "state",
              TypeInformation.of(Long.class),
              TypeInformation.of(new TypeHint<Row>() {}));
      MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
      /*MapStateDescriptor.setQueryable("test");*/
      map = getRuntimeContext().getMapState(MapStateDescriptor);

      MapStateDescriptor<Long, Row> clickstreamStateMapStateDescriptor =
          new MapStateDescriptor<Long, Row>(
              "clickstreamState",
              TypeInformation.of(Long.class),
              TypeInformation.of(new TypeHint<Row>() {}));
      clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(1)).build());
      clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
    }

    @Override
    public void processElement1(
        Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
      Long id = lookupRow.<Long>getFieldAs("id");
      if (!map.contains(id)) {
        map.put(id, lookupRow);
      }

      // join immediately any matching click events, waiting for counterpart
      if (clickstreamState.contains(id)) {
          // enrich join
          Row joinRow = join(clickstreamState.get(id), lookupRow);
          out.collect(joinRow);
          clickstreamState.remove(id)
      } 
    }

    @Override
    public void processElement2(
        Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

      Long id = clickRow.<Long>getFieldAs("id");

      if (map.contains(id)) {
          // enrich join
          Row joinRow = join(clickRow, map.get(id));
          out.collect(joinRow);
      } else {
        // put in state and check in 1 second
        clickstreamState.put(id, clickRow)
        Long currTimestamp = ctx.timestamp()
        ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
      }
    }

    public Row join(Row clickRow, Row lookupRow) throws ParseException {
      Row joinedRow = new Row(RowKind.INSERT, 13);
      // row setter join ouput
      return joinedRow;
    }

    @Override
    public void onTimer(
      Long timestamp,
      KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out
    ) {
       Long id = ctx.getCurrentKey
       Row clickRow = clickstreamState.get(id)
       if (map.contains(id)) {
          // enrich join
          val joinRow = join(clickRow, map.get(id));
          out.collect(joinRow);
       } else {
          // lookup entry not arrived even in 1 second, send it to side output - dlq
          ctx.output(outputTag, clickRow);
       }
       clickstreamState.remove(id)
  }
}}