Hazelcast Jet Cluster 进程重复
Hazlecast Jet Cluster Processes duplicates
我已经部署了 3 个 spring 嵌入了 Hazelcast Jet 的引导应用程序。节点相互识别并 运行 作为一个集群。我有以下代码:从 CSV 中简单读取并写入文件。但是 Jet 将重复项写入文件接收器。准确地说,Jet 处理 CSV 中的总条目数乘以节点数。因此,如果我在源和 3 个节点中有 10 个条目,我会在接收器中看到 3 个文件,每个文件都包含全部 10 个条目。我只想处理一次记录一次。以下是我的代码:
Pipeline p = Pipeline.create();
BatchSource<List<String>> source = Sources.filesBuilder("files")
.glob("*.csv")
.build(path -> Files.lines(path).skip(1).map(line -> split(line)));
p.readFrom(source)
.writeTo(Sinks.filesBuilder("out").build());
instance.newJob(p).join();
如果是共享文件系统,则FilesourceBuilder
中的sharedFileSystem
属性必须设置为true
。
我已经部署了 3 个 spring 嵌入了 Hazelcast Jet 的引导应用程序。节点相互识别并 运行 作为一个集群。我有以下代码:从 CSV 中简单读取并写入文件。但是 Jet 将重复项写入文件接收器。准确地说,Jet 处理 CSV 中的总条目数乘以节点数。因此,如果我在源和 3 个节点中有 10 个条目,我会在接收器中看到 3 个文件,每个文件都包含全部 10 个条目。我只想处理一次记录一次。以下是我的代码:
Pipeline p = Pipeline.create();
BatchSource<List<String>> source = Sources.filesBuilder("files")
.glob("*.csv")
.build(path -> Files.lines(path).skip(1).map(line -> split(line)));
p.readFrom(source)
.writeTo(Sinks.filesBuilder("out").build());
instance.newJob(p).join();
如果是共享文件系统,则FilesourceBuilder
中的sharedFileSystem
属性必须设置为true
。