连接两个 Kafka KTable 会导致 RocksDB 中出现 Nullpointer
Joining two Kafka KTable results in a Nullpointer in RocksDB
我正在尝试加入两个 Kafka Stream DSL KTable
使用:
KTable<String, String> source = builder.table("stream-source");
KTable<String, String> target = builder.table("stream-target");
source.join(target, new ValueJoiner<String, String, String>() {
public String apply(String value1, String value2) {
return value1 + ":" + value2;
}
});
我已经确保键和值都不是 null
:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < PERSONS_SOURCE.length; i++) {
producer.send(new ProducerRecord<String, String>("stream-source", Long.toString(i + 1L), PERSONS_SOURCE[i]));
}
for(int i = 0; i < PERSONS_TARGET.length; i++) {
producer.send(new ProducerRecord<String, String>("stream-target", Long.toString(i + 1L), PERSONS_TARGET[i]));
}
producer.close();
但是应用程序报告分区的RocksDB层存在空指针
[2016-07-17 21:58:04,682] ERROR User provided listener org.apache.kafka.streams.processor.internals.StreamThread for group streams-persons2 failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
java.lang.NullPointerException
at org.rocksdb.RocksDB.put(RocksDB.java:432)
at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:299)
at org.apache.kafka.streams.state.internals.RocksDBStore.access0(RocksDBStore.java:62)
at org.apache.kafka.streams.state.internals.RocksDBStore.restore(RocksDBStore.java:206)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:202)
发现问题是由于在应用程序代码中创建流而不是使用命令造成的:-
kafka-topics --create --topic stream-a --replication-factor 1 --partitions 1
似乎连接需要分区信息才能工作。
我正在尝试加入两个 Kafka Stream DSL KTable
使用:
KTable<String, String> source = builder.table("stream-source");
KTable<String, String> target = builder.table("stream-target");
source.join(target, new ValueJoiner<String, String, String>() {
public String apply(String value1, String value2) {
return value1 + ":" + value2;
}
});
我已经确保键和值都不是 null
:
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < PERSONS_SOURCE.length; i++) {
producer.send(new ProducerRecord<String, String>("stream-source", Long.toString(i + 1L), PERSONS_SOURCE[i]));
}
for(int i = 0; i < PERSONS_TARGET.length; i++) {
producer.send(new ProducerRecord<String, String>("stream-target", Long.toString(i + 1L), PERSONS_TARGET[i]));
}
producer.close();
但是应用程序报告分区的RocksDB层存在空指针
[2016-07-17 21:58:04,682] ERROR User provided listener org.apache.kafka.streams.processor.internals.StreamThread for group streams-persons2 failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) java.lang.NullPointerException at org.rocksdb.RocksDB.put(RocksDB.java:432) at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:299) at org.apache.kafka.streams.state.internals.RocksDBStore.access0(RocksDBStore.java:62) at org.apache.kafka.streams.state.internals.RocksDBStore.restore(RocksDBStore.java:206) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:210) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:202)
发现问题是由于在应用程序代码中创建流而不是使用命令造成的:-
kafka-topics --create --topic stream-a --replication-factor 1 --partitions 1
似乎连接需要分区信息才能工作。