为什么状态存储会因序列化问题而失败?
Why does state store fail with serialization issue?
我使用 Kafka Streams 1.1.0.
我创建了以下拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000001 (topics: [configurationTopicName])
--> KTABLE-SOURCE-0000000002
Processor: KTABLE-SOURCE-0000000002 (stores: [configurationTopicName-STATE-STORE-0000000000])
--> KTABLE-MAPVALUES-0000000003
<-- KSTREAM-SOURCE-0000000001
Processor: KTABLE-MAPVALUES-0000000003 (stores: [configuration_store_application1])
--> none
<-- KTABLE-SOURCE-0000000002
代码如下:
case class Test(name: String, age: Int)
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
builder.table(configurationTopicName, Consumed.`with`(Serdes.String(), Serdes.String()))
.someAdditionalTransformation
.mapValues[Test](
new ValueMapperWithKey[String, String, Test] {
override def apply(readOnlyKey: String, value: String): Test = Test("aaa", 432)
}, mal)
我想构建一个可查询的存储,以后可以用来查询它(检索 filtered/transformed 个值)。
我已经 运行 使用 TopologyTestDriver
进行了简单测试,但抛出了以下异常:
Caused by: java.lang.ClassCastException: com.example.kafka.streams.topology.Test cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.innerValue(MeteredKeyValueBytesStore.java:66)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.innerValue(MeteredKeyValueBytesStore.java:57)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:103)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:89)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access[=13=]0(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:244)
... 58 more
知道为什么以及如何解决它吗?
经过一番调查,我找到了上述异常的原因。
我创建了 Materialized 来存储数据,但我没有为键或值传递任何 Serdes。
如果您不传递任何参数,则使用默认值。在我的例子中是 StringSerializer
并且我正在尝试使用 StringSerializer mea culpa[=26= 序列化 Test
class 的对象]
要通过 Serdes .withValueSerde(GenericSerde[Test])
只需添加,其中 GenericSerdes 是 org.apache.kafka.common.serialization.Serde
的实现
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
.withValueSerde(GenericSerde[Test])
我使用 Kafka Streams 1.1.0.
我创建了以下拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000001 (topics: [configurationTopicName])
--> KTABLE-SOURCE-0000000002
Processor: KTABLE-SOURCE-0000000002 (stores: [configurationTopicName-STATE-STORE-0000000000])
--> KTABLE-MAPVALUES-0000000003
<-- KSTREAM-SOURCE-0000000001
Processor: KTABLE-MAPVALUES-0000000003 (stores: [configuration_store_application1])
--> none
<-- KTABLE-SOURCE-0000000002
代码如下:
case class Test(name: String, age: Int)
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
builder.table(configurationTopicName, Consumed.`with`(Serdes.String(), Serdes.String()))
.someAdditionalTransformation
.mapValues[Test](
new ValueMapperWithKey[String, String, Test] {
override def apply(readOnlyKey: String, value: String): Test = Test("aaa", 432)
}, mal)
我想构建一个可查询的存储,以后可以用来查询它(检索 filtered/transformed 个值)。
我已经 运行 使用 TopologyTestDriver
进行了简单测试,但抛出了以下异常:
Caused by: java.lang.ClassCastException: com.example.kafka.streams.topology.Test cannot be cast to java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.innerValue(MeteredKeyValueBytesStore.java:66) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.innerValue(MeteredKeyValueBytesStore.java:57) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:103) at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83) at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:89) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63) at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access[=13=]0(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.apply(CachingKeyValueStore.java:83) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:244) ... 58 more
知道为什么以及如何解决它吗?
经过一番调查,我找到了上述异常的原因。
我创建了 Materialized 来存储数据,但我没有为键或值传递任何 Serdes。
如果您不传递任何参数,则使用默认值。在我的例子中是 StringSerializer
并且我正在尝试使用 StringSerializer mea culpa[=26= 序列化 Test
class 的对象]
要通过 Serdes .withValueSerde(GenericSerde[Test])
只需添加,其中 GenericSerdes 是 org.apache.kafka.common.serialization.Serde
val mal: Materialized[String, Test, KeyValueStore[Bytes, Array[Byte]]] =
Materialized.as[String, Test, KeyValueStore[Bytes, Array[Byte]]](configurationStoreName(applicationId))
.withValueSerde(GenericSerde[Test])