Hazelcast Jet 查询
Hazelcast Jet Query
我有以下关于 Hazelcast Jet 的查询
用例如下
有一个应用程序(应用程序 'A',部署在集群中)使用 Hazelcast IMDG 并将数百万条记录/交易放在 hazelcast IMap 中。
已为此 IMap 配置事件日志。
还有另一个应用程序(应用程序 B,部署在集群中)实例化 JetInstance 并 运行 在每个节点上单独执行作业以处理记录。
目前,此作业从事件日志中读取数据并添加到 IList(参考 - hazelcast-jet-0.5.1\code-samples\streaming\map-journal-source\src\main\java\RemoteMapJournalSource.java)
由于作业 运行 在多个节点上运行,事件日志中的记录由多个节点处理。这导致 IList 中有多个条目。
是否可以确保一条记录仅由 'Application B' 的一个节点处理,而不会被其他节点处理以避免重复?
如果不是,这是否意味着 运行 作业将由 'Application B' 集群的单个节点完成?
这是示例代码(应用程序 B)
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
.peek()
.drainTo(Sinks.list(SINK_NAME));
JobConfig jc= new JobConfig();
jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
localJet.newJob(p,jc);
这里是完整的代码。
应用程序 A 源代码。
public class RemoteMapJournalSourceSrv1 {
private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";
public static void main(String[] args) throws Exception {
System.setProperty("remoteHz.logging.type", "log4j");
Config hzConfig = getConfig();
HazelcastInstance remoteHz = startRemoteHzCluster(hzConfig);
try {
IMap<Integer, Integer> map = remoteHz.getMap(MAP_NAME);
System.out.println("*************** Initial Map address " + map.size() );
while(true) {
System.out.println("***************map size "+map.size());
TimeUnit.SECONDS.sleep(20);
}
} finally {
Hazelcast.shutdownAll();
}
}
private static HazelcastInstance startRemoteHzCluster(Config config) {
HazelcastInstance remoteHz = Hazelcast.newHazelcastInstance(config);
return remoteHz;
}
private static Config getConfig() {
Config config = new Config();
// Add an event journal config for map which has custom capacity of 1000 (default 10_000)
// and time to live seconds as 10 seconds (default 0 which means infinite)
config.addEventJournalConfig(new EventJournalConfig().setEnabled(true)
.setMapName(MAP_NAME)
.setCapacity(10000)
.setTimeToLiveSeconds(100));
return config;
}
这是应用程序 B - 节点 1 示例代码
public class RemoteMapJournalSourceCL1 {
private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";
public static void main(String[] args) throws Exception {
System.setProperty("remoteHz.logging.type", "log4j");
JetInstance localJet = startLocalJetCluster();
try {
ClientConfig clientConfig = new ClientConfig();
GroupConfig groupConfig = new GroupConfig();
clientConfig.getNetworkConfig().addAddress("localhost:5701");
clientConfig.setGroupConfig(groupConfig);
IList list1 = localJet.getList(SINK_NAME);
int size1 = list1.size();
System.out.println("***************List Initial size "+size1);
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, false))
.peek()
.drainTo(Sinks.list(SINK_NAME));
JobConfig jc= new JobConfig();
jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
localJet.newJob(p,jc);
while(true){
TimeUnit.SECONDS.sleep(10);
System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
}
} finally {
Hazelcast.shutdownAll();
Jet.shutdownAll();
}
}
private static String getAddress(HazelcastInstance remoteHz) {
Address address = remoteHz.getCluster().getLocalMember().getAddress();
System.out.println("***************Remote address " + address.getHost() + ":" + address.getPort() );
return address.getHost() + ":" + address.getPort();
}
private static JetInstance startLocalJetCluster() {
JetInstance localJet = Jet.newJetInstance();
return localJet;
}
这是应用程序 B - 节点 2 示例代码
public class RemoteMapJournalSourceCL2 {
private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";
public static void main(String[] args) throws Exception {
System.setProperty("remoteHz.logging.type", "log4j");
JetInstance localJet = startLocalJetCluster();
try {
ClientConfig clientConfig = new ClientConfig();
GroupConfig groupConfig = new GroupConfig();
clientConfig.getNetworkConfig().addAddress("localhost:5701");
clientConfig.setGroupConfig(groupConfig);
IList list1 = localJet.getList(SINK_NAME);
int size1 = list1.size();
System.out.println("***************List Initial size "+size1);
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
.peek()
.drainTo(Sinks.list(SINK_NAME));
JobConfig jc= new JobConfig();
jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
localJet.newJob(p,jc);
while(true){
TimeUnit.SECONDS.sleep(10);
System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
}
} finally {
Hazelcast.shutdownAll();
Jet.shutdownAll();
}
}
private static JetInstance startLocalJetCluster() {
JetInstance localJet = Jet.newJetInstance();
return localJet;
}
Hazelcast 客户端 - 将条目放入 Hazelcast 地图(应用程序 A)
public class HZClient {
public static void main(String[] args) {
ClientConfig clientConfig = new ClientConfig();
GroupConfig groupConfig = new GroupConfig();
clientConfig.getNetworkConfig().addAddress("localhost:5701");
clientConfig.setGroupConfig(groupConfig);
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
IMap<Integer, Integer> map = client.getMap("map");
Scanner in = new Scanner(System.in);
int startIndex= 0;
int endIndex= 0;
while(true) {
if(args !=null && args.length > 0 && args[0].equals("BATCH")) {
System.out.println("Please input the batch size");
int b = in.nextInt();
startIndex= endIndex + 1;
endIndex+= b;
System.out.println("Batch starts from "+ startIndex +"ends at"+endIndex);
putBatch(map,startIndex,endIndex);
}
else {
System.out.println("Please input the map entry");
int a = in.nextInt();
System.out.println("You entered integer "+a);
put(map,a,a);
}
}
}
public static void putBatch(IMap map,int startIndex, int endIndex) {
int index= startIndex;
System.out.println("Start Index" + startIndex +"End Index"+endIndex );
while(index<=endIndex){
System.out.println("Map Values"+ index);
put(map,index,index);
index+=1;
}
}
public static void put(IMap map,int key,int value) {
map.set(key, value);
}
执行此操作的步骤如下。
运行 应用程序 A - Java 程序 RemoteMapJournalSourceSrv1
运行 应用程序 B 节点 1 - Java 程序 RemoteMapJournalSourceCL1
运行 应用程序 B 节点 2 - Java 程序 RemoteMapJournalSourceCL2
运行 应用程序 A 的 Hazelcast 客户端 - Java 程序 HZClient
此客户端程序根据控制台输入将条目放入地图中。请提供整数输入。
观察结果
执行时,.peek() 记录应用程序 B 的两个节点的值,并且列表计数在应用程序 A 映射中插入 1 个条目时变为 2。
您似乎正在从两个 Jet 客户端提交两个独立的作业。每个作业接收所有 IMap 事件日志项目并将它们推送到同一个 IList,因此预期结果是 IList 包含每个项目的两个实例。
请记住,您仅从 Jet 客户端提交 作业,但它实际上在 Jet 集群内同时在其所有成员上运行。如果您只想要接收器中的一份数据副本,请不要提交相同的作业两次。
我有以下关于 Hazelcast Jet 的查询
用例如下
有一个应用程序(应用程序 'A',部署在集群中)使用 Hazelcast IMDG 并将数百万条记录/交易放在 hazelcast IMap 中。
已为此 IMap 配置事件日志。
还有另一个应用程序(应用程序 B,部署在集群中)实例化 JetInstance 并 运行 在每个节点上单独执行作业以处理记录。
目前,此作业从事件日志中读取数据并添加到 IList(参考 - hazelcast-jet-0.5.1\code-samples\streaming\map-journal-source\src\main\java\RemoteMapJournalSource.java)
由于作业 运行 在多个节点上运行,事件日志中的记录由多个节点处理。这导致 IList 中有多个条目。
是否可以确保一条记录仅由 'Application B' 的一个节点处理,而不会被其他节点处理以避免重复?
如果不是,这是否意味着 运行 作业将由 'Application B' 集群的单个节点完成?
这是示例代码(应用程序 B)
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
.peek()
.drainTo(Sinks.list(SINK_NAME));
JobConfig jc= new JobConfig();
jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
localJet.newJob(p,jc);
这里是完整的代码。
应用程序 A 源代码。
public class RemoteMapJournalSourceSrv1 {
private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";
public static void main(String[] args) throws Exception {
System.setProperty("remoteHz.logging.type", "log4j");
Config hzConfig = getConfig();
HazelcastInstance remoteHz = startRemoteHzCluster(hzConfig);
try {
IMap<Integer, Integer> map = remoteHz.getMap(MAP_NAME);
System.out.println("*************** Initial Map address " + map.size() );
while(true) {
System.out.println("***************map size "+map.size());
TimeUnit.SECONDS.sleep(20);
}
} finally {
Hazelcast.shutdownAll();
}
}
private static HazelcastInstance startRemoteHzCluster(Config config) {
HazelcastInstance remoteHz = Hazelcast.newHazelcastInstance(config);
return remoteHz;
}
private static Config getConfig() {
Config config = new Config();
// Add an event journal config for map which has custom capacity of 1000 (default 10_000)
// and time to live seconds as 10 seconds (default 0 which means infinite)
config.addEventJournalConfig(new EventJournalConfig().setEnabled(true)
.setMapName(MAP_NAME)
.setCapacity(10000)
.setTimeToLiveSeconds(100));
return config;
}
这是应用程序 B - 节点 1 示例代码
public class RemoteMapJournalSourceCL1 {
private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";
public static void main(String[] args) throws Exception {
System.setProperty("remoteHz.logging.type", "log4j");
JetInstance localJet = startLocalJetCluster();
try {
ClientConfig clientConfig = new ClientConfig();
GroupConfig groupConfig = new GroupConfig();
clientConfig.getNetworkConfig().addAddress("localhost:5701");
clientConfig.setGroupConfig(groupConfig);
IList list1 = localJet.getList(SINK_NAME);
int size1 = list1.size();
System.out.println("***************List Initial size "+size1);
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, false))
.peek()
.drainTo(Sinks.list(SINK_NAME));
JobConfig jc= new JobConfig();
jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
localJet.newJob(p,jc);
while(true){
TimeUnit.SECONDS.sleep(10);
System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
}
} finally {
Hazelcast.shutdownAll();
Jet.shutdownAll();
}
}
private static String getAddress(HazelcastInstance remoteHz) {
Address address = remoteHz.getCluster().getLocalMember().getAddress();
System.out.println("***************Remote address " + address.getHost() + ":" + address.getPort() );
return address.getHost() + ":" + address.getPort();
}
private static JetInstance startLocalJetCluster() {
JetInstance localJet = Jet.newJetInstance();
return localJet;
}
这是应用程序 B - 节点 2 示例代码
public class RemoteMapJournalSourceCL2 {
private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";
public static void main(String[] args) throws Exception {
System.setProperty("remoteHz.logging.type", "log4j");
JetInstance localJet = startLocalJetCluster();
try {
ClientConfig clientConfig = new ClientConfig();
GroupConfig groupConfig = new GroupConfig();
clientConfig.getNetworkConfig().addAddress("localhost:5701");
clientConfig.setGroupConfig(groupConfig);
IList list1 = localJet.getList(SINK_NAME);
int size1 = list1.size();
System.out.println("***************List Initial size "+size1);
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
.peek()
.drainTo(Sinks.list(SINK_NAME));
JobConfig jc= new JobConfig();
jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
localJet.newJob(p,jc);
while(true){
TimeUnit.SECONDS.sleep(10);
System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
}
} finally {
Hazelcast.shutdownAll();
Jet.shutdownAll();
}
}
private static JetInstance startLocalJetCluster() {
JetInstance localJet = Jet.newJetInstance();
return localJet;
}
Hazelcast 客户端 - 将条目放入 Hazelcast 地图(应用程序 A)
public class HZClient {
public static void main(String[] args) {
ClientConfig clientConfig = new ClientConfig();
GroupConfig groupConfig = new GroupConfig();
clientConfig.getNetworkConfig().addAddress("localhost:5701");
clientConfig.setGroupConfig(groupConfig);
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
IMap<Integer, Integer> map = client.getMap("map");
Scanner in = new Scanner(System.in);
int startIndex= 0;
int endIndex= 0;
while(true) {
if(args !=null && args.length > 0 && args[0].equals("BATCH")) {
System.out.println("Please input the batch size");
int b = in.nextInt();
startIndex= endIndex + 1;
endIndex+= b;
System.out.println("Batch starts from "+ startIndex +"ends at"+endIndex);
putBatch(map,startIndex,endIndex);
}
else {
System.out.println("Please input the map entry");
int a = in.nextInt();
System.out.println("You entered integer "+a);
put(map,a,a);
}
}
}
public static void putBatch(IMap map,int startIndex, int endIndex) {
int index= startIndex;
System.out.println("Start Index" + startIndex +"End Index"+endIndex );
while(index<=endIndex){
System.out.println("Map Values"+ index);
put(map,index,index);
index+=1;
}
}
public static void put(IMap map,int key,int value) {
map.set(key, value);
}
执行此操作的步骤如下。
运行 应用程序 A - Java 程序 RemoteMapJournalSourceSrv1
运行 应用程序 B 节点 1 - Java 程序 RemoteMapJournalSourceCL1
运行 应用程序 B 节点 2 - Java 程序 RemoteMapJournalSourceCL2
运行 应用程序 A 的 Hazelcast 客户端 - Java 程序 HZClient
此客户端程序根据控制台输入将条目放入地图中。请提供整数输入。
观察结果
执行时,.peek() 记录应用程序 B 的两个节点的值,并且列表计数在应用程序 A 映射中插入 1 个条目时变为 2。
您似乎正在从两个 Jet 客户端提交两个独立的作业。每个作业接收所有 IMap 事件日志项目并将它们推送到同一个 IList,因此预期结果是 IList 包含每个项目的两个实例。
请记住,您仅从 Jet 客户端提交 作业,但它实际上在 Jet 集群内同时在其所有成员上运行。如果您只想要接收器中的一份数据副本,请不要提交相同的作业两次。