Flink 使用 KeyedCoProcessFunction 连接流
Flink connect streams using KeyedCoProcessFunction
对于 1:1 加入,我使用的是 KeyedCoProcessFunction,我有两个流,查找流(每秒 100 条记录)和点击流(每秒 10000 条记录)。在 processElement2
方法中,我正在寻找 MapState<Long,Row>
中的键,如果找到,则用它丰富点击流数据,否则将此记录设置为侧输出,然后下沉侧输出到 kafka。我没有在两个输入流上使用任何 window。对于 kakfa 中的 dlq 主题,我连续看到每秒生成 1-2 条记录,我怎么能以某种方式等待几毫秒在 processElement2
方法中查找 id,然后再将其推送到侧输出。
val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
.connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
.process(new EnrichJoinFunction());
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
@Override
public void open(Configuration parameters) throws Exception {
val MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
val id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
val id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not yet arrived, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
}}
您可以使用 TimerService 来实现。
因此,我们的想法是将没有立即匹配的查找数据的点击流行存储在专用 MapState<Long,Row>
中,并注册 processingTimeTimer/eventTimeTimer
计时器,该计时器将在一段时间后触发。在计时器回调中,您可以尝试在那里加入查找数据和点击流数据。如果再次找不到匹配项,则最后将此点击事件发送到侧输出。
它可能如下所示:
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
private MapState<Long, Row> clickstreamState = null;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Long, Row> MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
MapStateDescriptor<Long, Row> clickstreamStateMapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"clickstreamState",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(1)).build());
clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
Long id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
// join immediately any matching click events, waiting for counterpart
if (clickstreamState.contains(id)) {
// enrich join
Row joinRow = join(clickstreamState.get(id), lookupRow);
out.collect(joinRow);
clickstreamState.remove(id)
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
Long id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
Row joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// put in state and check in 1 second
clickstreamState.put(id, clickRow)
Long currTimestamp = ctx.timestamp()
ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
@Override
public void onTimer(
Long timestamp,
KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out
) {
Long id = ctx.getCurrentKey
Row clickRow = clickstreamState.get(id)
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not arrived even in 1 second, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
clickstreamState.remove(id)
}
}}
对于 1:1 加入,我使用的是 KeyedCoProcessFunction,我有两个流,查找流(每秒 100 条记录)和点击流(每秒 10000 条记录)。在 processElement2
方法中,我正在寻找 MapState<Long,Row>
中的键,如果找到,则用它丰富点击流数据,否则将此记录设置为侧输出,然后下沉侧输出到 kafka。我没有在两个输入流上使用任何 window。对于 kakfa 中的 dlq 主题,我连续看到每秒生成 1-2 条记录,我怎么能以某种方式等待几毫秒在 processElement2
方法中查找 id,然后再将其推送到侧输出。
val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
.connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
.process(new EnrichJoinFunction());
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
@Override
public void open(Configuration parameters) throws Exception {
val MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
val id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
val id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not yet arrived, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
}}
您可以使用 TimerService 来实现。
因此,我们的想法是将没有立即匹配的查找数据的点击流行存储在专用 MapState<Long,Row>
中,并注册 processingTimeTimer/eventTimeTimer
计时器,该计时器将在一段时间后触发。在计时器回调中,您可以尝试在那里加入查找数据和点击流数据。如果再次找不到匹配项,则最后将此点击事件发送到侧输出。
它可能如下所示:
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
private MapState<Long, Row> clickstreamState = null;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Long, Row> MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
MapStateDescriptor<Long, Row> clickstreamStateMapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"clickstreamState",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(1)).build());
clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
Long id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
// join immediately any matching click events, waiting for counterpart
if (clickstreamState.contains(id)) {
// enrich join
Row joinRow = join(clickstreamState.get(id), lookupRow);
out.collect(joinRow);
clickstreamState.remove(id)
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
Long id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
Row joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// put in state and check in 1 second
clickstreamState.put(id, clickRow)
Long currTimestamp = ctx.timestamp()
ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
@Override
public void onTimer(
Long timestamp,
KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out
) {
Long id = ctx.getCurrentKey
Row clickRow = clickstreamState.get(id)
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not arrived even in 1 second, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
clickstreamState.remove(id)
}
}}