KafkaStreams 同一应用程序中的多个流
KafkaStreams multiple streams in same application
我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策。
假设我有两个不同的事件要放入 KTable
中。我有一个生产者将这些消息发送到正在收听该主题的 KStream
。
据我所知,我不能对使用 KafkaStreams
的消息使用条件转发,因此如果流订阅了多个主题(例如,一个用于上述消息中的每一个)我只能调用 stream.to
在单个接收器主题上 - 否则,我将不得不在流上调用 foreach
并将带有 KProducer
的消息发送到接收器主题。
以上建议使用单个流。我以为我可以在同一个应用程序中设置多个流,每个流听一个主题,映射并转发到 table 接收器,但每次我尝试创建 KafkaStreams
的两个实例时,只有第一个被初始化订阅它的主题 - 另一个从客户端收到警告,它的主题没有订阅。
我可以在同一个应用程序中设置多个流吗?如果可以,有什么特殊要求吗?
class Stream(topic: String) {
val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
val streamsBuilder = new StreamsBuilder
val topics = new util.ArrayList[String]
topics.add(props.get("topic"))
val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))
def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
builder.stream[String, String](
topics,
Consumed.`with`(String(), String())
)
}
def init(): KafkaStreams = {
val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)
streams.start()
streams
}
}
class Streams() {
val eventStream = new Stream("first_event") //looking good!
val eventStream2 = new Stream("second_event") // no subscribers
//if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
val streams: KafkaStreams = eventStream.init()
val streams2: KafkaStreams = eventStream2.init()
}
流配置
val streamConfig: Properties = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
properties
}
我也喜欢建议的任何替代方案
From what I can tell I cannot use conditional forwarding for messages
您知道 KStream#split()
(KStream#branch()
顺序版本)吗?与条件转发基本相同
I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink,
这应该按如下方式工作:
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");
stream1.to("table1-topic");
stream2.to("table2-topic");
but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.
不确定。这应该工作。也许你可以分享你的代码?
当您创建 KafkaStreams 时,您需要使用不同的 application.id 传递 属性,例如:
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
然后你应该创建另一个流:
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
StreamsBuilder builder = new SteamsBuilder();
KStream stream2 = builder.stream("topic2");
KafkaStreams streams2 = new KafkaStreams(builder, props);
streams2.start();
我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策。
假设我有两个不同的事件要放入 KTable
中。我有一个生产者将这些消息发送到正在收听该主题的 KStream
。
据我所知,我不能对使用 KafkaStreams
的消息使用条件转发,因此如果流订阅了多个主题(例如,一个用于上述消息中的每一个)我只能调用 stream.to
在单个接收器主题上 - 否则,我将不得不在流上调用 foreach
并将带有 KProducer
的消息发送到接收器主题。
以上建议使用单个流。我以为我可以在同一个应用程序中设置多个流,每个流听一个主题,映射并转发到 table 接收器,但每次我尝试创建 KafkaStreams
的两个实例时,只有第一个被初始化订阅它的主题 - 另一个从客户端收到警告,它的主题没有订阅。
我可以在同一个应用程序中设置多个流吗?如果可以,有什么特殊要求吗?
class Stream(topic: String) {
val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
val streamsBuilder = new StreamsBuilder
val topics = new util.ArrayList[String]
topics.add(props.get("topic"))
val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))
def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
builder.stream[String, String](
topics,
Consumed.`with`(String(), String())
)
}
def init(): KafkaStreams = {
val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)
streams.start()
streams
}
}
class Streams() {
val eventStream = new Stream("first_event") //looking good!
val eventStream2 = new Stream("second_event") // no subscribers
//if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
val streams: KafkaStreams = eventStream.init()
val streams2: KafkaStreams = eventStream2.init()
}
流配置
val streamConfig: Properties = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
properties
}
我也喜欢建议的任何替代方案
From what I can tell I cannot use conditional forwarding for messages
您知道 KStream#split()
(KStream#branch()
顺序版本)吗?与条件转发基本相同
I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink,
这应该按如下方式工作:
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");
stream1.to("table1-topic");
stream2.to("table2-topic");
but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.
不确定。这应该工作。也许你可以分享你的代码?
当您创建 KafkaStreams 时,您需要使用不同的 application.id 传递 属性,例如:
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
然后你应该创建另一个流:
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
StreamsBuilder builder = new SteamsBuilder();
KStream stream2 = builder.stream("topic2");
KafkaStreams streams2 = new KafkaStreams(builder, props);
streams2.start();