使用基于计数的 window 加入两个流
Join two streams using a count-based window
我是 Flink Streaming 的新手 API,我想完成以下简单的 (IMO) 任务。我有两个流,我想使用基于计数的 windows 加入它们。到目前为止我的代码如下:
public class BaselineCategoryEquiJoin {
private static final String recordFile = "some_file.txt";
private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> {
public Tuple2<String[], MyRecord> map(String s) throws Exception {
MyRecord myRecord = parse(s);
return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord);
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
ExecutionConfig config = environment.getConfig();
config.setParallelism(8);
DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile)
.map(new ParseRecordFunction());
DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile)
.map(new ParseRecordFunction());
DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1
.join(dataStream)
.where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() {
public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
return recordTuple2.f0;
}
}).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() {
public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
return recordTuple2.f0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() {
public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception {
return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0);
}
}).print();
environment.execute();
}
}
我的代码没有错误,但没有产生任何结果。事实上,对 apply
方法的调用从未被调用(通过在调试模式下添加断点来验证)。我认为,前一个的主要原因是我的数据没有时间属性。因此,窗口化(通过 window
实现)没有正确完成。因此,我的问题是如何表明我希望我的加入基于 count-windows 发生。例如,我希望连接实现每个流中的每 100 个元组。之前在Flink中可行吗?如果是,我应该在我的代码中更改什么来实现它。
此时,我必须通知你,我尝试调用了countWindow()
方法,但是由于某些原因Flink的JoinedStreams
没有提供。
谢谢
不支持基于计数的联接。您可以通过使用 "event-time" 语义来模拟基于计数的 windows,并将唯一的 seq-id 作为时间戳应用于每条记录。因此,time-window of "5" 实际上是 count-window of 5.
我是 Flink Streaming 的新手 API,我想完成以下简单的 (IMO) 任务。我有两个流,我想使用基于计数的 windows 加入它们。到目前为止我的代码如下:
public class BaselineCategoryEquiJoin {
private static final String recordFile = "some_file.txt";
private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> {
public Tuple2<String[], MyRecord> map(String s) throws Exception {
MyRecord myRecord = parse(s);
return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord);
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
ExecutionConfig config = environment.getConfig();
config.setParallelism(8);
DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile)
.map(new ParseRecordFunction());
DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile)
.map(new ParseRecordFunction());
DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1
.join(dataStream)
.where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() {
public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
return recordTuple2.f0;
}
}).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() {
public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
return recordTuple2.f0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() {
public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception {
return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0);
}
}).print();
environment.execute();
}
}
我的代码没有错误,但没有产生任何结果。事实上,对 apply
方法的调用从未被调用(通过在调试模式下添加断点来验证)。我认为,前一个的主要原因是我的数据没有时间属性。因此,窗口化(通过 window
实现)没有正确完成。因此,我的问题是如何表明我希望我的加入基于 count-windows 发生。例如,我希望连接实现每个流中的每 100 个元组。之前在Flink中可行吗?如果是,我应该在我的代码中更改什么来实现它。
此时,我必须通知你,我尝试调用了countWindow()
方法,但是由于某些原因Flink的JoinedStreams
没有提供。
谢谢
不支持基于计数的联接。您可以通过使用 "event-time" 语义来模拟基于计数的 windows,并将唯一的 seq-id 作为时间戳应用于每条记录。因此,time-window of "5" 实际上是 count-window of 5.