Kafka 是否以事务模式 ACID 流式传输到 Ignite 运行?

Is Kafka streaming to Ignite running in Transactional mode ACID?

我在 Apache Ignite 中有一个分布式数据库和一个将数据流式传输到 Ignite 集群的 Apache Kafka 流式服务。 Kafka 流媒体工作如下

  1. 创建点燃节点以查找集群
  2. 在集群中启动kafka streamer单例作为服务
  3. 关闭点燃节点

Ignite 集群处于事务模式,但我不确定这是否保证 ACID 或仅启用它。 Ignite 的这种流媒体服务可以被视为 ACID 吗?

这里是kafka streamer的代码:

public class IgniteKafkaStreamerService implements Service {

private static final long serialVersionUID = 1L;

@IgniteInstanceResource
private Ignite ignite;
private KafkaStreamer<String, JSONObject> kafkaStreamer = new KafkaStreamer<>();
private IgniteLogger logger;

public static void main(String[] args) throws InterruptedException {
    TcpDiscoverySpi spi = new TcpDiscoverySpi();

    TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();

    // Set Multicast group.
    //ipFinder.setMulticastGroup("228.10.10.157");

    // Set initial IP addresses.
    // Note that you can optionally specify a port or a port range.
    ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));

    spi.setIpFinder(ipFinder);

    IgniteConfiguration cfg = new IgniteConfiguration();

    // Override default discovery SPI.
    cfg.setDiscoverySpi(spi);
    Ignite ignite = Ignition.getOrStart(cfg);

    // Deploy data streamer service on the server nodes.
    ClusterGroup forServers = ignite.cluster().forServers();
    IgniteKafkaStreamerService streamer = new IgniteKafkaStreamerService();
    ignite.services(forServers).deployClusterSingleton("KafkaService", streamer);
    ignite.close();
}


@Override
public void init(ServiceContext ctx) {
    logger = ignite.log();
    IgniteDataStreamer<String, JSONObject> stmr = ignite.dataStreamer("my_cache");
    stmr.allowOverwrite(true);
    stmr.autoFlushFrequency(1000);
    List<String> topics = new ArrayList<>();
    topics.add(0,"IoTData");

    kafkaStreamer.setIgnite(ignite);
    kafkaStreamer.setStreamer(stmr);
    kafkaStreamer.setThreads(4);
    kafkaStreamer.setTopic(topics);
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "NiFi-consumer");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.242:9092");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("group.id", "hello");
    kafkaStreamer.setConsumerConfig(props);
    kafkaStreamer.setSingleTupleExtractor(msg -> {
        JSONObject jsonObj = new JSONObject(msg.value().toString());
        String key = jsonObj.getString("id") + "," + new Date(msg.timestamp());
        JSONObject value = jsonObj.accumulate("date", new Date(msg.timestamp()));

        return new AbstractMap.SimpleEntry<>(key, value);

    });
}

@Override
public void execute(ServiceContext ctx) {
    kafkaStreamer.start();
    logger.info("KafkaStreamer started.");
}

@Override
public void cancel(ServiceContext ctx) {
    kafkaStreamer.stop();
    logger.info("KafkaStreamer stopped.");
}

}

KafkaStreamer 在后台使用 IgniteDataStreamer 实现。 IgniteDataStreamer 本质上不是交易性的,因此没有任何交易保证。