Apache Ignite Kafka 连接问题
Apache Ignite Kafka connection issues
我正在尝试对 Kafka 消息流进行流处理和 CEP。为此,我首先选择了 Apache Ignite 来实现原型。但是我无法连接到队列:
使用
kafka_2.11-0.10.1.0
apache-ignite-fabric-1.8.0-bin
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Kafka 正常,我用消费者测试过。
然后我开始点燃,然后我 运行 在 spring 引导命令行应用程序中跟随。
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
Properties settings = new Properties();
// Set a few key parameters
settings.put("bootstrap.servers", "localhost:9092");
settings.put("group.id", "test");
settings.put("zookeeper.connect", "localhost:2181");
settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create an instance of StreamsConfig from the Properties instance
kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);
IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");
try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic("test");
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(1);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(config);
// set decoders
StringDecoder keyDecoder = new StringDecoder(null);
StringDecoder valueDecoder = new StringDecoder(null);
kafkaStreamer.setKeyDecoder(keyDecoder);
kafkaStreamer.setValueDecoder(valueDecoder);
kafkaStreamer.start();
} finally {
kafkaStreamer.stop();
}
应用程序启动时我得到
2017-02-23 10:25:23.409 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 bootstrap.servers 无效
2017-02-23 10:25:23.410 INFO 1388 --- [main] kafka.utils.VerifiableProperties : 属性 group.id 被覆盖以进行测试
2017-02-23 10:25:23.410 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 key.deserializer 无效
2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 key.serializer 无效
2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 value.deserializer 无效
2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 value.serializer 无效
2017-02-23 10:25:23.411 INFO 1388 --- [main] kafka.utils.VerifiableProperties : 属性 zookeeper.connect 被覆盖为 localhost:2181
然后
2017-02-23 10:25:24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils$:获取主题相关 ID 为 0 的主题元数据 [Set(test) ] 来自经纪人 [BrokerEndPoint(0,user.local,9092)] 失败
java.nio.channels.ClosedChannelException: 空
在 kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.11-0.10.0.1.jar:na]
在 kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) ~[kafka_2.11-0.10.0.1.jar:na]
在 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) ~[kafka_2.11-0.10.0.1.jar:na]
在 kafka.producer.SyncProducer.send(SyncProducer.scala:124) ~[kafka_2.11-0.10.0.1.jar:na]
在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) [kafka_2.11-0.10.0.1.jar:na]
在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) [kafka_2.11-0.10.0.1.jar:na]
在 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) [kafka_2.11-0.10.0.1.jar:na]
在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [kafka_2.11-0.10.0.1.jar:na]
并且从队列中读取不起作用。
有人知道如何解决这个问题吗?
编辑:如果我对 finally 块的内容进行注释,则会出现以下错误
[2m2017-02-27 16:42:27.780[0;39m [31mERROR[0;39m [35m29946[0;39m [2m---[0;39m [2m[pool-3- thread-1][0;39m [36m [0;39m [2m:[0;39m 由于错误 [msg=MessageAndMetadata(test,0,Message(magic = 1, attributes = 0, CreateTime = - 1, crc = 2558126716, key = java.nio.HeapByteBuffer[pos=0 lim=1 cap=79], payload = java.nio.HeapByteBuffer[pos=0 lim=74 cap=74]),15941704,kafka.serializer.StringDecoder@74a96647,kafka.serializer.StringDecoder@42849d34,-1,CreateTime)]
java.lang.IllegalStateException: 数据流已关闭。
在 org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:401) ~[ignite-core-1.8.0.jar:1.8.0]
在 org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:613) ~[ignite-core-1.8.0.jar:1.8.0]
在 org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:667) ~[ignite-core-1.8.0.jar:1.8.0]
在 org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:180) ~[ignite-kafka-1.8.0.jar:1.8.0]
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111]
在 java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111]
在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111]
在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111]
在 java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
谢谢!
我认为发生这种情况是因为 KafkaStreamer
在启动后立即关闭(kafkaStreamer.stop()
在 finally
块中调用)。 kafkaStreamer.start()
不是同步的,它只是分出线程从 Kafka 消费并退出。
我正在尝试对 Kafka 消息流进行流处理和 CEP。为此,我首先选择了 Apache Ignite 来实现原型。但是我无法连接到队列:
使用 kafka_2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Kafka 正常,我用消费者测试过。 然后我开始点燃,然后我 运行 在 spring 引导命令行应用程序中跟随。
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
Properties settings = new Properties();
// Set a few key parameters
settings.put("bootstrap.servers", "localhost:9092");
settings.put("group.id", "test");
settings.put("zookeeper.connect", "localhost:2181");
settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create an instance of StreamsConfig from the Properties instance
kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);
IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");
try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic("test");
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(1);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(config);
// set decoders
StringDecoder keyDecoder = new StringDecoder(null);
StringDecoder valueDecoder = new StringDecoder(null);
kafkaStreamer.setKeyDecoder(keyDecoder);
kafkaStreamer.setValueDecoder(valueDecoder);
kafkaStreamer.start();
} finally {
kafkaStreamer.stop();
}
应用程序启动时我得到
2017-02-23 10:25:23.409 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 bootstrap.servers 无效 2017-02-23 10:25:23.410 INFO 1388 --- [main] kafka.utils.VerifiableProperties : 属性 group.id 被覆盖以进行测试 2017-02-23 10:25:23.410 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 key.deserializer 无效 2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 key.serializer 无效 2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 value.deserializer 无效 2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties : 属性 value.serializer 无效 2017-02-23 10:25:23.411 INFO 1388 --- [main] kafka.utils.VerifiableProperties : 属性 zookeeper.connect 被覆盖为 localhost:2181
然后
2017-02-23 10:25:24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils$:获取主题相关 ID 为 0 的主题元数据 [Set(test) ] 来自经纪人 [BrokerEndPoint(0,user.local,9092)] 失败
java.nio.channels.ClosedChannelException: 空 在 kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.11-0.10.0.1.jar:na] 在 kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) ~[kafka_2.11-0.10.0.1.jar:na] 在 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) ~[kafka_2.11-0.10.0.1.jar:na] 在 kafka.producer.SyncProducer.send(SyncProducer.scala:124) ~[kafka_2.11-0.10.0.1.jar:na] 在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) [kafka_2.11-0.10.0.1.jar:na] 在 kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) [kafka_2.11-0.10.0.1.jar:na] 在 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) [kafka_2.11-0.10.0.1.jar:na] 在 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [kafka_2.11-0.10.0.1.jar:na]
并且从队列中读取不起作用。 有人知道如何解决这个问题吗?
编辑:如果我对 finally 块的内容进行注释,则会出现以下错误
[2m2017-02-27 16:42:27.780[0;39m [31mERROR[0;39m [35m29946[0;39m [2m---[0;39m [2m[pool-3- thread-1][0;39m [36m [0;39m [2m:[0;39m 由于错误 [msg=MessageAndMetadata(test,0,Message(magic = 1, attributes = 0, CreateTime = - 1, crc = 2558126716, key = java.nio.HeapByteBuffer[pos=0 lim=1 cap=79], payload = java.nio.HeapByteBuffer[pos=0 lim=74 cap=74]),15941704,kafka.serializer.StringDecoder@74a96647,kafka.serializer.StringDecoder@42849d34,-1,CreateTime)]
java.lang.IllegalStateException: 数据流已关闭。 在 org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:401) ~[ignite-core-1.8.0.jar:1.8.0] 在 org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:613) ~[ignite-core-1.8.0.jar:1.8.0] 在 org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:667) ~[ignite-core-1.8.0.jar:1.8.0] 在 org.apache.ignite.stream.kafka.KafkaStreamer$1.run(KafkaStreamer.java:180) ~[ignite-kafka-1.8.0.jar:1.8.0] 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] 在 java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
谢谢!
我认为发生这种情况是因为 KafkaStreamer
在启动后立即关闭(kafkaStreamer.stop()
在 finally
块中调用)。 kafkaStreamer.start()
不是同步的,它只是分出线程从 Kafka 消费并退出。