我想用数据流 api 替换数据集 api

I want to replace dataset api with datstream api

我的flink版本是1.13

dataSet.first(limit).print();

如何用数据流替换它api?

在 DataSet 的情况下,数据是有限的,而对于 DataStream,元素的数量可以是无限的。我想你已经知道了,所以当你读取有时无序连续到达的数据时,.first(n) 元素的概念是不一样的。

如果您不能使用 Flink SQL,那么您可以编写自己的 limit 运算符,例如(警告,未经测试!):

public class LimitFilter<T> extends RichFilterFunction<T> {

    private int _limit;
    
    private transient int _remainingRecords;
    
    public LimitFilter(int limit) {
        _limit = limit;
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Calc number of records to return from this subtask
        int mySubtask = getRuntimeContext().getIndexOfThisSubtask();
        int remainingItems = _limit;
        int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
        for (int i = 0; i < parallelism; i++) {
            int remainingGroups = parallelism - i;
            int itemsInGroup = remainingItems / remainingGroups;
            if (i == mySubtask) {
                _remainingRecords = itemsInGroup;
                break;
            }
            
            remainingItems -= itemsInGroup;
        }
    }
    
    @Override
    public boolean filter(T value) throws Exception {
        if (_remainingRecords <= 0) {
            return false;
        }
        
        _remainingRecords--;
        return true;
    }

}

如果您在运算符之间有一些合理随机的数据分布,这仅适用于并行度 > 1,例如在 .filter(new LimitFilter(limit)) 运算符之前使用 rebalance()