我想用数据流 api 替换数据集 api
I want to replace dataset api with datstream api
我的flink版本是1.13
dataSet.first(limit).print();
如何用数据流替换它api?
在 DataSet 的情况下,数据是有限的,而对于 DataStream,元素的数量可以是无限的。我想你已经知道了,所以当你读取有时无序连续到达的数据时,.first(n)
元素的概念是不一样的。
如果您不能使用 Flink SQL,那么您可以编写自己的 limit
运算符,例如(警告,未经测试!):
public class LimitFilter<T> extends RichFilterFunction<T> {
private int _limit;
private transient int _remainingRecords;
public LimitFilter(int limit) {
_limit = limit;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Calc number of records to return from this subtask
int mySubtask = getRuntimeContext().getIndexOfThisSubtask();
int remainingItems = _limit;
int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
for (int i = 0; i < parallelism; i++) {
int remainingGroups = parallelism - i;
int itemsInGroup = remainingItems / remainingGroups;
if (i == mySubtask) {
_remainingRecords = itemsInGroup;
break;
}
remainingItems -= itemsInGroup;
}
}
@Override
public boolean filter(T value) throws Exception {
if (_remainingRecords <= 0) {
return false;
}
_remainingRecords--;
return true;
}
}
如果您在运算符之间有一些合理随机的数据分布,这仅适用于并行度 > 1,例如在 .filter(new LimitFilter(limit))
运算符之前使用 rebalance()
。
我的flink版本是1.13
dataSet.first(limit).print();
如何用数据流替换它api?
在 DataSet 的情况下,数据是有限的,而对于 DataStream,元素的数量可以是无限的。我想你已经知道了,所以当你读取有时无序连续到达的数据时,.first(n)
元素的概念是不一样的。
如果您不能使用 Flink SQL,那么您可以编写自己的 limit
运算符,例如(警告,未经测试!):
public class LimitFilter<T> extends RichFilterFunction<T> {
private int _limit;
private transient int _remainingRecords;
public LimitFilter(int limit) {
_limit = limit;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Calc number of records to return from this subtask
int mySubtask = getRuntimeContext().getIndexOfThisSubtask();
int remainingItems = _limit;
int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
for (int i = 0; i < parallelism; i++) {
int remainingGroups = parallelism - i;
int itemsInGroup = remainingItems / remainingGroups;
if (i == mySubtask) {
_remainingRecords = itemsInGroup;
break;
}
remainingItems -= itemsInGroup;
}
}
@Override
public boolean filter(T value) throws Exception {
if (_remainingRecords <= 0) {
return false;
}
_remainingRecords--;
return true;
}
}
如果您在运算符之间有一些合理随机的数据分布,这仅适用于并行度 > 1,例如在 .filter(new LimitFilter(limit))
运算符之前使用 rebalance()
。