Kafka Streams:java.lang.IllegalArgumentException:VoidDeserializer 的数据应该为空
Kafka Streams: java.lang.IllegalArgumentException: Data should be null for a VoidDeserializer
我正在研究我的第一个 Kafka Streams 样本:
package com.example;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
class DslExample {
public static void main(String[] args) {
// the builder is used to construct the topology
StreamsBuilder builder = new StreamsBuilder();
// read from the source topic, "users"
KStream<Void, String> stream = builder.stream("users");
// for each record that appears in the source topic,
// print the value
stream.foreach(
(key, value) -> {
System.out.println("(DSL) Hello, " + value);
});
// you can also print using the `print` operator
// stream.print(Printed.<String, String>toSysOut().withLabel("source"));
// set the required properties for running Kafka Streams
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.24:29092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// build the topology and start streaming
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
// close Kafka Streams when the JVM shuts down (e.g. SIGTERM)
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
当我尝试 运行 时,我得到了这个错误:
Caused by: java.lang.IllegalArgumentException: Data should be null for a VoidDeserializer.
这是来自“用户”主题的示例消息:
值:
{
"registertime": 1517518703752,
"userid": "User_8",
"regionid": "Region_7",
"gender": "OTHER"
}
Header:
[
{
"key": "task.generation",
"stringValue": "0"
},
{
"key": "task.id",
"stringValue": "0"
},
{
"key": "current.iteration",
"stringValue": "86144"
}
]
键:
User_8
我应该怎么做才能避免这个问题?
如果密钥实际上有数据,则不应使用 Serdes.Void()
或 KStream<Void,
我正在研究我的第一个 Kafka Streams 样本:
package com.example;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
class DslExample {
public static void main(String[] args) {
// the builder is used to construct the topology
StreamsBuilder builder = new StreamsBuilder();
// read from the source topic, "users"
KStream<Void, String> stream = builder.stream("users");
// for each record that appears in the source topic,
// print the value
stream.foreach(
(key, value) -> {
System.out.println("(DSL) Hello, " + value);
});
// you can also print using the `print` operator
// stream.print(Printed.<String, String>toSysOut().withLabel("source"));
// set the required properties for running Kafka Streams
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.24:29092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// build the topology and start streaming
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
// close Kafka Streams when the JVM shuts down (e.g. SIGTERM)
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
当我尝试 运行 时,我得到了这个错误:
Caused by: java.lang.IllegalArgumentException: Data should be null for a VoidDeserializer.
这是来自“用户”主题的示例消息:
值:
{
"registertime": 1517518703752,
"userid": "User_8",
"regionid": "Region_7",
"gender": "OTHER"
}
Header:
[
{
"key": "task.generation",
"stringValue": "0"
},
{
"key": "task.id",
"stringValue": "0"
},
{
"key": "current.iteration",
"stringValue": "86144"
}
]
键:
User_8
我应该怎么做才能避免这个问题?
如果密钥实际上有数据,则不应使用 Serdes.Void()
或 KStream<Void,