Hazelcast Jet Cluster 进程重复

Hazlecast Jet Cluster Processes duplicates

我已经部署了 3 个 spring 嵌入了 Hazelcast Jet 的引导应用程序。节点相互识别并 运行 作为一个集群。我有以下代码:从 CSV 中简单读取并写入文件。但是 Jet 将重复项写入文件接收器。准确地说,Jet 处理 CSV 中的总条目数乘以节点数。因此,如果我在源和 3 个节点中有 10 个条目,我会在接收器中看到 3 个文件,每个文件都包含全部 10 个条目。我只想处理一次记录一次。以下是我的代码:

    Pipeline p = Pipeline.create();

    BatchSource<List<String>> source = Sources.filesBuilder("files")
            .glob("*.csv")
            .build(path -> Files.lines(path).skip(1).map(line -> split(line)));

    p.readFrom(source)
            .writeTo(Sinks.filesBuilder("out").build());
    instance.newJob(p).join();

如果是共享文件系统,则FilesourceBuilder中的sharedFileSystem属性必须设置为true