Kafka 流创建一个简单的物化视图
Kafka streams creating a simple materialized view
我有一些事件通过一堆非唯一的字符串字段和一个事件时间戳进入 Kafka。我想创建这些事件的物化视图,以便我可以查询它们。例如:
- 显示所有事件
- 显示
field1 = some string
的所有事件
- 显示匹配多个字段的所有事件
- 显示2个日期之间的事件
我见过的所有示例都对流进行聚合、连接或其他一些转换操作。我找不到一个简单的例子来创建一组事件的视图。我不想执行任何操作,我只想能够查询进入流的原始事件。
我正在使用 Spring Kafka
,因此使用 Spring Kafka
的示例将是理想的。
我能够将消息输入 Kafka 并使用它们。但是,我一直无法创建物化视图。
我有以下过滤事件的代码(不是我真正想要的,我想要所有事件,但我只是想看看我是否可以获得物化视图):
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
keyValueStore = interactiveQueryService.getQueryableStore(ALL_MESSAGES,QueryableStoreTypes.keyValueStore());
myMessages.filter((key,value) -> (value.getKey() != null));
Materialized.<String,MyMessage,KeyValueStore<Bytes,byte[]>> as(ALL_MESSAGES)
.withKeySerde(Serdes.String())
.withValueSerde(new MyMessageSerde());
这是抛出异常:
java.lang.ClassCastException: [B cannot be cast to MyMessage
at org.apache.kafka.streams.kstream.internals.KTableFilter.computeValue(KTableFilter.java:57)
at org.apache.kafka.streams.kstream.internals.KTableFilter.access0(KTableFilter.java:25)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:79)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access[=11=]0(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 21 more
我不明白为什么,因为我将商店的 valueSerde 设置为 MyMessageSerde,它知道如何 serialize/deserialize MyMessage 到字节数组。
更新
我将代码更改为以下内容:
myMessages.filter((key,value) -> (value.getKey() != null));
并将以下内容添加到我的 application.yml
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: MyMessageDeserializer `
现在我得到以下堆栈跟踪:
Exception in thread "raven-a43f181b-ccb6-4d9b-a8fd-9fe96542c210-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_3] Failed to flush state store all-messages
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:202)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:420)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:394)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:382)
at org.apache.kafka.streams.processor.internals.AssignedTasks.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1042)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.ClassCastException: [B cannot be cast to MyMessage
at org.apache.kafka.streams.kstream.internals.KTableFilter.computeValue(KTableFilter.java:57)
at org.apache.kafka.streams.kstream.internals.KTableFilter.access0(KTableFilter.java:25)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:79)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access[=13=]0(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 12 more`
我能够创建实体化视图如下:
配置在application.yml
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde`
这会设置正确的序列化程序和实体化视图。
以下代码使用上述配置创建了具体化视图的 KTable。
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
}
不支持您想要的查询类型。请注意,没有二级索引,但仅支持常规 key-based 查找和范围。
如果您预先知道所有查询,则可以将数据re-group 转换为以查询属性作为键的派生 KTables
。请注意,键必须是唯一的,因此,如果查询属性包含 non-unique 数据,则需要使用一些 Collection
类型作为值:
KTable originalTable = builder.table(...)
KTable keyedByFieldATable = originalTable.groupBy(/*select field A*/).aggregate(/* the aggregation return a list or similar of entries for the key*/);
请注意,每次 re-key 原始 table 时都会重复存储要求。
作为替代方案,您可以对原始 table 进行完整 table 扫描,并在使用返回的迭代器时评估过滤条件。
这是 space 与 CPU 的权衡。也许 Kafka Streams 不是解决您问题的正确工具。
我有一些事件通过一堆非唯一的字符串字段和一个事件时间戳进入 Kafka。我想创建这些事件的物化视图,以便我可以查询它们。例如:
- 显示所有事件
- 显示
field1 = some string
的所有事件
- 显示匹配多个字段的所有事件
- 显示2个日期之间的事件
我见过的所有示例都对流进行聚合、连接或其他一些转换操作。我找不到一个简单的例子来创建一组事件的视图。我不想执行任何操作,我只想能够查询进入流的原始事件。
我正在使用 Spring Kafka
,因此使用 Spring Kafka
的示例将是理想的。
我能够将消息输入 Kafka 并使用它们。但是,我一直无法创建物化视图。
我有以下过滤事件的代码(不是我真正想要的,我想要所有事件,但我只是想看看我是否可以获得物化视图):
@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
keyValueStore = interactiveQueryService.getQueryableStore(ALL_MESSAGES,QueryableStoreTypes.keyValueStore());
myMessages.filter((key,value) -> (value.getKey() != null));
Materialized.<String,MyMessage,KeyValueStore<Bytes,byte[]>> as(ALL_MESSAGES)
.withKeySerde(Serdes.String())
.withValueSerde(new MyMessageSerde());
这是抛出异常:
java.lang.ClassCastException: [B cannot be cast to MyMessage
at org.apache.kafka.streams.kstream.internals.KTableFilter.computeValue(KTableFilter.java:57)
at org.apache.kafka.streams.kstream.internals.KTableFilter.access0(KTableFilter.java:25)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:79)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access[=11=]0(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 21 more
我不明白为什么,因为我将商店的 valueSerde 设置为 MyMessageSerde,它知道如何 serialize/deserialize MyMessage 到字节数组。
更新
我将代码更改为以下内容:
myMessages.filter((key,value) -> (value.getKey() != null));
并将以下内容添加到我的 application.yml
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: MyMessageDeserializer `
现在我得到以下堆栈跟踪:
Exception in thread "raven-a43f181b-ccb6-4d9b-a8fd-9fe96542c210-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_3] Failed to flush state store all-messages
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:202)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:420)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:394)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:382)
at org.apache.kafka.streams.processor.internals.AssignedTasks.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1042)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.ClassCastException: [B cannot be cast to MyMessage
at org.apache.kafka.streams.kstream.internals.KTableFilter.computeValue(KTableFilter.java:57)
at org.apache.kafka.streams.kstream.internals.KTableFilter.access0(KTableFilter.java:25)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:79)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:63)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access[=13=]0(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 12 more`
我能够创建实体化视图如下:
配置在application.yml
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: com.me.MyMessageSerde`
这会设置正确的序列化程序和实体化视图。
以下代码使用上述配置创建了具体化视图的 KTable。
public void process(@Input("input") KTable<String,MyMessage> myMessages) {
}
不支持您想要的查询类型。请注意,没有二级索引,但仅支持常规 key-based 查找和范围。
如果您预先知道所有查询,则可以将数据re-group 转换为以查询属性作为键的派生 KTables
。请注意,键必须是唯一的,因此,如果查询属性包含 non-unique 数据,则需要使用一些 Collection
类型作为值:
KTable originalTable = builder.table(...)
KTable keyedByFieldATable = originalTable.groupBy(/*select field A*/).aggregate(/* the aggregation return a list or similar of entries for the key*/);
请注意,每次 re-key 原始 table 时都会重复存储要求。
作为替代方案,您可以对原始 table 进行完整 table 扫描,并在使用返回的迭代器时评估过滤条件。
这是 space 与 CPU 的权衡。也许 Kafka Streams 不是解决您问题的正确工具。