点燃 dataStreamer 不工作

Ignite dataStreamer is not working

我正在使用以下代码探索 Ignite dataStreamer。 但是输出是:

对于MessageKey0001,输出所有显示数据为空。 对于MessageKey0003,输出也全部显示数据为null 对于 MessageKey0002,输出没有任何显示,看起来接收者代码不是 运行

当我改变

dataStreamer.addData(i, "data-" + i);

    IgniteFuture future = dataStreamer.addData(i, "data-" + i);
    future.get();

future.get() 没有 return,看起来 addData 没有完成? 我不确定问题出在哪里,有人可以看一下吗?谢谢!

package ignite.streamer;

import org.apache.ignite.*;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.stream.StreamReceiver;

import javax.cache.Cache;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;

class IgniteDataStreamer_Person implements Serializable {
    @QuerySqlField(index = true)
    private String name;

    @QuerySqlField(index = true)
    private int age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

public class IgniteDataStreamerTest {
    public static void main(String[] args) {
        String configPath = "D:/Software/apache-ignite-fabric-1.7.0-bin/apache-ignite-fabric-1.7.0-bin/config/default-config.xml";
        Ignite ignite = Ignition.start(configPath);
        CacheConfiguration<Integer, String> cfg = new CacheConfiguration<Integer, String>();
        String cacheName = "stream_cache";
        cfg.setName(cacheName);
        cfg.setIndexedTypes(Integer.class, IgniteDataStreamer_Person.class);
        Cache cache = ignite.getOrCreateCache(cfg);

        IgniteDataStreamer<Integer, String> dataStreamer = ignite.dataStreamer(cacheName);

        for (int i = 0; i < 3; i++) {
            dataStreamer.addData(i, "data-" + i);
        }

        //null is got from cache
        for (int i = 0; i < 3; i++) {
            System.out.println(String.format("0001: data is %s ", cache.get(i)));
        }

        dataStreamer.receiver(new StreamReceiver<Integer, String>() {
            public void receive(IgniteCache<Integer, String> cache, Collection<Map.Entry<Integer, String>> entries) throws IgniteException {
                //nothing is printed to console
                for (Map.Entry<Integer, String> entry : entries) {
                    System.out.println(String.format("0002: key is: %s, value is: %s", entry.getKey(), entry.getValue()));
                }
            }
        });

        //null is got from cache
        for (int i = 0; i < 3; i++) {
            System.out.println(String.format("0003: data is %s ", cache.get(i)));
        }
        ignite.close();
    }
}

DataStreamer 使用批处理以提供良好的性能。在阻塞 future.get() 方法之前,您应该在您的案例中刷新数据(使用 flush() 方法)。

请参阅 IgniteDataStreamer javadoc 了解详细信息。