Kafka Streams 将 KTable 值映射到单独的值
Kafka Streams mapping KTable value into separate values
我有一个 Kafka Stream 应用程序,它将单个主题读取为 KTable
,将每个元素转换为 0-n
元素并将它们全部写入另一个主题。流程看起来像这样(简化了很多):
('a', '123') -> ('a', '1,2,3') -> ('a1', '1'), ('a2', '2'), ('a3', '3')
使用 Kafka Stream DSL 是否可行?使用的所有主题都是compact
,因此我想模拟一个table并且永远不要摆脱旧值。
tl;dr;如何将一条消息转换为多条消息?
不确定您是否可以使用 DSL 表达这一点。但是使用 Processor API:
做到这一点相当简单
builder.stream("input-topic").transform(...).to("output-topic");
您将键值存储附加到您的 transform()
并对每个输入记录执行以下操作:
- 检查store中是否有对应的键值对
- 如果是(即
store.get()!=null
),则从store中取出旧值拆分;将每个 "split record" 的值替换为 null
并发出所有这些记录
- 如果输入记录
value!=null
,将输入记录按原样放入存储并拆分输入记录并发出单独的输出记录
- 如果输入记录
value==null
,从存储中删除 key
我有一个 Kafka Stream 应用程序,它将单个主题读取为 KTable
,将每个元素转换为 0-n
元素并将它们全部写入另一个主题。流程看起来像这样(简化了很多):
('a', '123') -> ('a', '1,2,3') -> ('a1', '1'), ('a2', '2'), ('a3', '3')
使用 Kafka Stream DSL 是否可行?使用的所有主题都是compact
,因此我想模拟一个table并且永远不要摆脱旧值。
tl;dr;如何将一条消息转换为多条消息?
不确定您是否可以使用 DSL 表达这一点。但是使用 Processor API:
做到这一点相当简单builder.stream("input-topic").transform(...).to("output-topic");
您将键值存储附加到您的 transform()
并对每个输入记录执行以下操作:
- 检查store中是否有对应的键值对
- 如果是(即
store.get()!=null
),则从store中取出旧值拆分;将每个 "split record" 的值替换为null
并发出所有这些记录
- 如果是(即
- 如果输入记录
value!=null
,将输入记录按原样放入存储并拆分输入记录并发出单独的输出记录 - 如果输入记录
value==null
,从存储中删除key