如何在 Kafka 流中创建一个以 HashMap 作为值的状态存储?
How to create a state store with HashMap as value in Kafka streams?
我需要创建一个以字符串键 HashMap 作为值的状态存储。我尝试了以下两种方法。
// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
.withKeys(Serdes.String())
.withValues(HashMap.class)
.persistent()
.build();
// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();
StateStoreSupplier avgStore1 = Stores.create("Avgs")
.withKeys(Serdes.String())
.withValues(Serdes.serdeFrom(h.getClass()))
.persistent()
.build();
代码编译正常,没有任何错误,但出现运行时错误
io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer
有人可以建议我创建 state store 的正确方法是什么吗?
如果要创建state store,需要为类型提供serializer和deserializer class你想用。在 Kafka Stream 中,有一个名为 Serde 的抽象,它将序列化器和反序列化器包装在一个 class.
中
如果你使用 .withValues(Class<K> keyClass)
它必须保持
@param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes
因为 HashMap
没有内置的 Serdes
你需要先实现一个(可能叫做 HashMapSerde
)然后把这个 class 给方法 .withValues(Serde<K> keySerde)
。此外,您还必须为 HashMap
实现实际的序列化器和反序列化器。如果你知道你的 HashMap 的泛型类型,你应该指定它们(这使得序列化器和反序列化器的实现更加简单。
类似这样的东西(只是一个草图;省略了通用类型):
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
public class HashMapSerde implements Serde<HashMap> {
void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
void close() {
/* put your code here */
}
Serializer<HashMap> serializer() {
return new Serializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public byte[] serialize(String topic, T data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
Deserializer<HashMap> deserializer() {
return new Deserializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public T deserialize(String topic, byte[] data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
}
如果您想查看有关如何实现(反)序列化器和 Serde
的示例,请查看 https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
我需要创建一个以字符串键 HashMap 作为值的状态存储。我尝试了以下两种方法。
// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
.withKeys(Serdes.String())
.withValues(HashMap.class)
.persistent()
.build();
// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();
StateStoreSupplier avgStore1 = Stores.create("Avgs")
.withKeys(Serdes.String())
.withValues(Serdes.serdeFrom(h.getClass()))
.persistent()
.build();
代码编译正常,没有任何错误,但出现运行时错误
io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer
有人可以建议我创建 state store 的正确方法是什么吗?
如果要创建state store,需要为类型提供serializer和deserializer class你想用。在 Kafka Stream 中,有一个名为 Serde 的抽象,它将序列化器和反序列化器包装在一个 class.
中如果你使用 .withValues(Class<K> keyClass)
它必须保持
@param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes
因为 HashMap
没有内置的 Serdes
你需要先实现一个(可能叫做 HashMapSerde
)然后把这个 class 给方法 .withValues(Serde<K> keySerde)
。此外,您还必须为 HashMap
实现实际的序列化器和反序列化器。如果你知道你的 HashMap 的泛型类型,你应该指定它们(这使得序列化器和反序列化器的实现更加简单。
类似这样的东西(只是一个草图;省略了通用类型):
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
public class HashMapSerde implements Serde<HashMap> {
void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
void close() {
/* put your code here */
}
Serializer<HashMap> serializer() {
return new Serializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public byte[] serialize(String topic, T data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
Deserializer<HashMap> deserializer() {
return new Deserializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public T deserialize(String topic, byte[] data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
}
如果您想查看有关如何实现(反)序列化器和 Serde
的示例,请查看 https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java