点燃 dataStreamer 不工作
Ignite dataStreamer is not working
我正在使用以下代码探索 Ignite dataStreamer。
但是输出是:
对于MessageKey0001,输出所有显示数据为空。
对于MessageKey0003,输出也全部显示数据为null
对于 MessageKey0002,输出没有任何显示,看起来接收者代码不是 运行
当我改变
dataStreamer.addData(i, "data-" + i);
到
IgniteFuture future = dataStreamer.addData(i, "data-" + i);
future.get();
future.get() 没有 return,看起来 addData 没有完成?
我不确定问题出在哪里,有人可以看一下吗?谢谢!
package ignite.streamer;
import org.apache.ignite.*;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.stream.StreamReceiver;
import javax.cache.Cache;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
class IgniteDataStreamer_Person implements Serializable {
@QuerySqlField(index = true)
private String name;
@QuerySqlField(index = true)
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
public class IgniteDataStreamerTest {
public static void main(String[] args) {
String configPath = "D:/Software/apache-ignite-fabric-1.7.0-bin/apache-ignite-fabric-1.7.0-bin/config/default-config.xml";
Ignite ignite = Ignition.start(configPath);
CacheConfiguration<Integer, String> cfg = new CacheConfiguration<Integer, String>();
String cacheName = "stream_cache";
cfg.setName(cacheName);
cfg.setIndexedTypes(Integer.class, IgniteDataStreamer_Person.class);
Cache cache = ignite.getOrCreateCache(cfg);
IgniteDataStreamer<Integer, String> dataStreamer = ignite.dataStreamer(cacheName);
for (int i = 0; i < 3; i++) {
dataStreamer.addData(i, "data-" + i);
}
//null is got from cache
for (int i = 0; i < 3; i++) {
System.out.println(String.format("0001: data is %s ", cache.get(i)));
}
dataStreamer.receiver(new StreamReceiver<Integer, String>() {
public void receive(IgniteCache<Integer, String> cache, Collection<Map.Entry<Integer, String>> entries) throws IgniteException {
//nothing is printed to console
for (Map.Entry<Integer, String> entry : entries) {
System.out.println(String.format("0002: key is: %s, value is: %s", entry.getKey(), entry.getValue()));
}
}
});
//null is got from cache
for (int i = 0; i < 3; i++) {
System.out.println(String.format("0003: data is %s ", cache.get(i)));
}
ignite.close();
}
}
DataStreamer 使用批处理以提供良好的性能。在阻塞 future.get()
方法之前,您应该在您的案例中刷新数据(使用 flush()
方法)。
请参阅 IgniteDataStreamer
javadoc 了解详细信息。
我正在使用以下代码探索 Ignite dataStreamer。 但是输出是:
对于MessageKey0001,输出所有显示数据为空。 对于MessageKey0003,输出也全部显示数据为null 对于 MessageKey0002,输出没有任何显示,看起来接收者代码不是 运行
当我改变
dataStreamer.addData(i, "data-" + i);
到
IgniteFuture future = dataStreamer.addData(i, "data-" + i);
future.get();
future.get() 没有 return,看起来 addData 没有完成? 我不确定问题出在哪里,有人可以看一下吗?谢谢!
package ignite.streamer;
import org.apache.ignite.*;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.stream.StreamReceiver;
import javax.cache.Cache;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
class IgniteDataStreamer_Person implements Serializable {
@QuerySqlField(index = true)
private String name;
@QuerySqlField(index = true)
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
public class IgniteDataStreamerTest {
public static void main(String[] args) {
String configPath = "D:/Software/apache-ignite-fabric-1.7.0-bin/apache-ignite-fabric-1.7.0-bin/config/default-config.xml";
Ignite ignite = Ignition.start(configPath);
CacheConfiguration<Integer, String> cfg = new CacheConfiguration<Integer, String>();
String cacheName = "stream_cache";
cfg.setName(cacheName);
cfg.setIndexedTypes(Integer.class, IgniteDataStreamer_Person.class);
Cache cache = ignite.getOrCreateCache(cfg);
IgniteDataStreamer<Integer, String> dataStreamer = ignite.dataStreamer(cacheName);
for (int i = 0; i < 3; i++) {
dataStreamer.addData(i, "data-" + i);
}
//null is got from cache
for (int i = 0; i < 3; i++) {
System.out.println(String.format("0001: data is %s ", cache.get(i)));
}
dataStreamer.receiver(new StreamReceiver<Integer, String>() {
public void receive(IgniteCache<Integer, String> cache, Collection<Map.Entry<Integer, String>> entries) throws IgniteException {
//nothing is printed to console
for (Map.Entry<Integer, String> entry : entries) {
System.out.println(String.format("0002: key is: %s, value is: %s", entry.getKey(), entry.getValue()));
}
}
});
//null is got from cache
for (int i = 0; i < 3; i++) {
System.out.println(String.format("0003: data is %s ", cache.get(i)));
}
ignite.close();
}
}
DataStreamer 使用批处理以提供良好的性能。在阻塞 future.get()
方法之前,您应该在您的案例中刷新数据(使用 flush()
方法)。
请参阅 IgniteDataStreamer
javadoc 了解详细信息。