如何定义 StreamsBuilderFactoryBean 的两个实例
how to define two instances of StreamsBuilderFactoryBean
我是 Spring Kafka 的新手。出于某种原因,我想创建两个 StreamsBuilderFactoryBean,如您所见,我定义了两个 StreamsBuilderFactoryBean,一个名为“commonDSLBuilder
”,另一个名为“propertyDSLBuilder
”和 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4)
。所以“commonDSLBuilde
”只创建一个消费者,而“propertyDSLBuilder
”创建四个消费者。
@Configuration
@EnableKafka
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Value("${spring.kafka.stream.application-id}")
private String applicationId;
@Bean(name = "commonDSLBuilder")
public StreamsBuilderFactoryBean commonDSLBuilder() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig streamsConfig = new StreamsConfig(props);
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilder.setSingleton(Boolean.FALSE);
return streamsBuilder;
}
@Bean(name = "propertyDSLBuilder")
public StreamsBuilderFactoryBean propertyDSLBuilder() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
StreamsConfig streamsConfig = new StreamsConfig(props);
CleanupConfig cleanupConfig = new CleanupConfig(Boolean.TRUE, Boolean.TRUE);
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig, cleanupConfig);
streamsBuilder.setSingleton(Boolean.FALSE);
return streamsBuilder;
}
}
我这样使用“commonDSLBuilder
”
@Configuration
public class BindPostDSL {
private static final Logger log = LoggerFactory.getLogger(BindPostDSL.class);
@Autowired
@Qualifier("commonDSLBuilder")
private StreamsBuilder builder;
@Bean(name = "bindPostKStream")
public KStream<String, String> kStream() {
log.info("bind 事件处理启动");
KStream<String, String> stream = builder.stream("test");
stream.foreach((key, value) -> {
log.info("receive kafka bind post,key:{},value:{}", key, value);
});
return stream;
}
}
但是当我启动应用程序时,将创建 5 个消费者(我猜 1 个来自 commonDSLBuilder
+ 4 个来自 propertyDSLBuilder
),我该如何解决这个问题。
2018-08-06 10:34:12 [test-streams-827fcc9b-3b9a-46f3-a941-961033cdb2cf-StreamThread-1] INFO StreamThread:336 - stream-thread [test-streams-827fcc9b-3b9a-46f3-a941-961033cdb2cf-StreamThread-1] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-2] INFO StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-2] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-4] INFO StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-4] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-3] INFO StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-3] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-5] INFO StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-5] Starting
你的方向是正确的。
你需要两个豆子 StreamsBuilderFactoryBean
和两个豆子 KStream
。每个 KStream 都有特定的 StreamsBuilderFactoryBean。您不需要在 streamsBuilder 上调用 setSingleton(Boolean.FALSE);
。
@Bean
public FactoryBean<StreamsBuilder> commonDSLBuilder() {
...
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
return streamsBuilder;
}
@Bean
public FactoryBean<StreamsBuilder> propertyDSLBuilder() {
...
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig, cleanupConfig);
return streamsBuilder;
}
@Bean
public KStream<String, String> bindKStream(StreamsBuilder commonDSLBuilder) {
KStream<String, String> kStream = commonDSLBuilder.stream("commonTopicName");
kStream.foreach((key, value) -> { ... });
return kStream;
}
@Bean
public KStream<String, String> perperyKStream(StreamsBuilder propertyDSLBuilder) {
KStream<String, String> kStream = propertyDSLBuilder.stream("propertyTopicName");
kStream.foreach((key, value) -> { ... });
return kStream;
}
我是 Spring Kafka 的新手。出于某种原因,我想创建两个 StreamsBuilderFactoryBean,如您所见,我定义了两个 StreamsBuilderFactoryBean,一个名为“commonDSLBuilder
”,另一个名为“propertyDSLBuilder
”和 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4)
。所以“commonDSLBuilde
”只创建一个消费者,而“propertyDSLBuilder
”创建四个消费者。
@Configuration
@EnableKafka
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Value("${spring.kafka.stream.application-id}")
private String applicationId;
@Bean(name = "commonDSLBuilder")
public StreamsBuilderFactoryBean commonDSLBuilder() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig streamsConfig = new StreamsConfig(props);
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilder.setSingleton(Boolean.FALSE);
return streamsBuilder;
}
@Bean(name = "propertyDSLBuilder")
public StreamsBuilderFactoryBean propertyDSLBuilder() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
StreamsConfig streamsConfig = new StreamsConfig(props);
CleanupConfig cleanupConfig = new CleanupConfig(Boolean.TRUE, Boolean.TRUE);
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig, cleanupConfig);
streamsBuilder.setSingleton(Boolean.FALSE);
return streamsBuilder;
}
}
我这样使用“commonDSLBuilder
”
@Configuration
public class BindPostDSL {
private static final Logger log = LoggerFactory.getLogger(BindPostDSL.class);
@Autowired
@Qualifier("commonDSLBuilder")
private StreamsBuilder builder;
@Bean(name = "bindPostKStream")
public KStream<String, String> kStream() {
log.info("bind 事件处理启动");
KStream<String, String> stream = builder.stream("test");
stream.foreach((key, value) -> {
log.info("receive kafka bind post,key:{},value:{}", key, value);
});
return stream;
}
}
但是当我启动应用程序时,将创建 5 个消费者(我猜 1 个来自 commonDSLBuilder
+ 4 个来自 propertyDSLBuilder
),我该如何解决这个问题。
2018-08-06 10:34:12 [test-streams-827fcc9b-3b9a-46f3-a941-961033cdb2cf-StreamThread-1] INFO StreamThread:336 - stream-thread [test-streams-827fcc9b-3b9a-46f3-a941-961033cdb2cf-StreamThread-1] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-2] INFO StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-2] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-4] INFO StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-4] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-3] INFO StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-3] Starting
2018-08-06 10:34:13 [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-5] INFO StreamThread:336 - stream-thread [test-streams-579948de-2e4a-4af4-acbe-542304a95167-StreamThread-5] Starting
你的方向是正确的。
你需要两个豆子 StreamsBuilderFactoryBean
和两个豆子 KStream
。每个 KStream 都有特定的 StreamsBuilderFactoryBean。您不需要在 streamsBuilder 上调用 setSingleton(Boolean.FALSE);
。
@Bean
public FactoryBean<StreamsBuilder> commonDSLBuilder() {
...
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig);
return streamsBuilder;
}
@Bean
public FactoryBean<StreamsBuilder> propertyDSLBuilder() {
...
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(streamsConfig, cleanupConfig);
return streamsBuilder;
}
@Bean
public KStream<String, String> bindKStream(StreamsBuilder commonDSLBuilder) {
KStream<String, String> kStream = commonDSLBuilder.stream("commonTopicName");
kStream.foreach((key, value) -> { ... });
return kStream;
}
@Bean
public KStream<String, String> perperyKStream(StreamsBuilder propertyDSLBuilder) {
KStream<String, String> kStream = propertyDSLBuilder.stream("propertyTopicName");
kStream.foreach((key, value) -> { ... });
return kStream;
}