试图理解 Kafka 的 KStream 概念
Trying to understand Kafka's KStream concept
@StreamListener("notification-input-channel")
@SendTo("notification-output-channel")
public KStream<String,Notification> process(KStream<String,PosInvoice> input)
{
KStream<String,Notification> notificationStream=input.
filter(k,v)->v.getCustomerType.equals("Prime").
mapValues(v->recordBuilder.getNotification(v);
return notificationStream;
}
首先,我对卡夫卡完全陌生。现在我的问题是在输入 KStream 中,我将值作为 String 和 Kafka,然后将其转换为 PosInvoice 对象。但是在 notificationStream 中,String 作为键的目的是什么?返回 KStream 对象的目的是什么?据我所知,当我必须模拟地从主题中读取并写入主题时,需要 KStreams。你能解释一下代码的数据流吗?
您永远不会修改键,因此它们的值是什么并不重要。它们用于将输出分区到 @SendTo
主题。您还在某处定义了一个键 StringSerde
以从 notification-input-channel
主题中读取。
返回 KStream 的目的是因为这是方法签名所期望的,Spring 的生命周期挂钩将构建到 Kafka Streams 拓扑中。
@StreamListener("notification-input-channel")
@SendTo("notification-output-channel")
public KStream<String,Notification> process(KStream<String,PosInvoice> input)
{
KStream<String,Notification> notificationStream=input.
filter(k,v)->v.getCustomerType.equals("Prime").
mapValues(v->recordBuilder.getNotification(v);
return notificationStream;
}
首先,我对卡夫卡完全陌生。现在我的问题是在输入 KStream 中,我将值作为 String 和 Kafka,然后将其转换为 PosInvoice 对象。但是在 notificationStream 中,String 作为键的目的是什么?返回 KStream 对象的目的是什么?据我所知,当我必须模拟地从主题中读取并写入主题时,需要 KStreams。你能解释一下代码的数据流吗?
您永远不会修改键,因此它们的值是什么并不重要。它们用于将输出分区到 @SendTo
主题。您还在某处定义了一个键 StringSerde
以从 notification-input-channel
主题中读取。
返回 KStream 的目的是因为这是方法签名所期望的,Spring 的生命周期挂钩将构建到 Kafka Streams 拓扑中。