任务管理器正在乱序接收消息
Task Manager is receiving messages out of order
我正在从文件中读取消息并在写入 Sink 之前应用一些运算符,注意到任务管理器正在获取给定键的乱序消息。如何解决这个问题?我做错了什么吗?请检查我的源文件格式和下面的代码。谢谢!
customer_id - timestamp - event_seq
1 t1 e1
1 t2 e2
2 t1 e1
2 t2 e2
1 t3 e3
DataStream<String> source = createTextFileSourceFromConfig(env);
source.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
return new JSONObject(s);
}
}).keyBy(new KeySelector<JSONObject, String>() {
@Override
public String getKey(JSONObject event) {
return event.get("id").toString();
}
}).filter(new InitialFilter())...
在上面的示例中,任务管理器正在接收 <1 t3 e3> 作为 id=1 的第一条消息。感谢您的建议。
如果读取并行度 > 1,则无法保证下游操作员接收记录的顺序,因为输入文件可以在并行执行的源之间拆分。因此,您可以让文件最后一个块的源在第一个块之前开始发出行。
我正在从文件中读取消息并在写入 Sink 之前应用一些运算符,注意到任务管理器正在获取给定键的乱序消息。如何解决这个问题?我做错了什么吗?请检查我的源文件格式和下面的代码。谢谢!
customer_id - timestamp - event_seq
1 t1 e1
1 t2 e2
2 t1 e1
2 t2 e2
1 t3 e3
DataStream<String> source = createTextFileSourceFromConfig(env);
source.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String s) throws Exception {
return new JSONObject(s);
}
}).keyBy(new KeySelector<JSONObject, String>() {
@Override
public String getKey(JSONObject event) {
return event.get("id").toString();
}
}).filter(new InitialFilter())...
在上面的示例中,任务管理器正在接收 <1 t3 e3> 作为 id=1 的第一条消息。感谢您的建议。
如果读取并行度 > 1,则无法保证下游操作员接收记录的顺序,因为输入文件可以在并行执行的源之间拆分。因此,您可以让文件最后一个块的源在第一个块之前开始发出行。