Spring-整合 XML 到 Java
Spring-Integration XML to Java
如何将此代码转换为 Java 配置?
<int-kafka:outbound-channel-adapter
id="mainOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="mainOutboundTopicChanel">
</int-kafka:outbound-channel-adapter>
是的,你可以。请找到最新的 Spring Integration Java DSL:
您的情况可能如下所示:
@Bean
public IntegrationFlow sendToKafkaFlow(String serverAddress) {
return f -> f.<String>split(p -> FastList.newWithNValues(100, () -> p), null)
.handle(kafkaMessageHandler(serverAddress));
}
private KafkaProducerMessageHandlerSpec kafkaMessageHandler(String serverAddress) {
return Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(TEST_TOPIC, serverAddress, this::producer);
}
private void producer(KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) {
metadata.async(true)
.batchNumMessages(10)
.valueClassType(String.class)
.<String>valueEncoder(String::getBytes)
.keyEncoder(new IntEncoder(null));
}
更新
没有 Lambda,但仍然 Spring 集成:
@Bean
@ServiceActivator(inputChannel = "mainOutboundTopicChanel")
public MessageHandler kafkaProducer() {
return new KafkaProducerMessageHandler<String, String>(kafkaProducerContext());
}
@Bean
public KafkaProducerContext<String, String> kafkaProducerContext() {
KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(TOPIC);
producerMetadata.setValueClassType(String.class);
producerMetadata.setKeyClassType(String.class);
Encoder<String> encoder = new StringEncoder<String>();
producerMetadata.setValueEncoder(encoder);
producerMetadata.setKeyEncoder(encoder);
producerMetadata.setAsync(true);
Properties props = new Properties();
props.put("queue.buffering.max.ms", "15000");
ProducerFactoryBean<String, String> producer =
new ProducerFactoryBean<String, String>(producerMetadata, kafkaRule.getBrokersAsString(), props);
ProducerConfiguration<String, String> config =
new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(TOPIC, config));
return kafkaProducerContext;
}
并且不要忘记在 @Configuration
旁边添加 @EnableIntegration
。
未来:Spring 中的任何 XML 标签都会被一些 NamespaceHandler
解析,例如在本例中为 KafkaNamespaceHandler
。阅读它的源代码我们可以找到这些行:
registerBeanDefinitionParser("outbound-channel-adapter", new KafkaOutboundChannelAdapterParser());
registerBeanDefinitionParser("producer-context", new KafkaProducerContextParser());
当我们转到 KafkaOutboundChannelAdapterParser
并看到它填充了一个 BeanDefinition
:
final BeanDefinitionBuilder kafkaProducerMessageHandlerBuilder =
BeanDefinitionBuilder.genericBeanDefinition(KafkaProducerMessageHandler.class);
源代码依此类推
更新 2
Consumer
部分:
@Bean
@InboundChannelAdapter(value = "fromKafkaChannel",
poller = @Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
public MessageSource<Map<String, Map<Integer, List<Object>>>> kafkaMessageSource() {
return new KafkaHighLevelConsumerMessageSource<String, String>();
}
@Bean
public KafkaConsumerContext<String, String> kafkaConsumerContext() {
KafkaConsumerContext<String, String> kafkaConsumerContext = new KafkaConsumerContext<String, String>();
.....
kafkaConsumerContext.setConsumerConfigurations(map);
return kafkaConsumerContext;
}
如何将此代码转换为 Java 配置?
<int-kafka:outbound-channel-adapter
id="mainOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="mainOutboundTopicChanel">
</int-kafka:outbound-channel-adapter>
是的,你可以。请找到最新的 Spring Integration Java DSL:
您的情况可能如下所示:
@Bean
public IntegrationFlow sendToKafkaFlow(String serverAddress) {
return f -> f.<String>split(p -> FastList.newWithNValues(100, () -> p), null)
.handle(kafkaMessageHandler(serverAddress));
}
private KafkaProducerMessageHandlerSpec kafkaMessageHandler(String serverAddress) {
return Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(TEST_TOPIC, serverAddress, this::producer);
}
private void producer(KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) {
metadata.async(true)
.batchNumMessages(10)
.valueClassType(String.class)
.<String>valueEncoder(String::getBytes)
.keyEncoder(new IntEncoder(null));
}
更新 没有 Lambda,但仍然 Spring 集成:
@Bean
@ServiceActivator(inputChannel = "mainOutboundTopicChanel")
public MessageHandler kafkaProducer() {
return new KafkaProducerMessageHandler<String, String>(kafkaProducerContext());
}
@Bean
public KafkaProducerContext<String, String> kafkaProducerContext() {
KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(TOPIC);
producerMetadata.setValueClassType(String.class);
producerMetadata.setKeyClassType(String.class);
Encoder<String> encoder = new StringEncoder<String>();
producerMetadata.setValueEncoder(encoder);
producerMetadata.setKeyEncoder(encoder);
producerMetadata.setAsync(true);
Properties props = new Properties();
props.put("queue.buffering.max.ms", "15000");
ProducerFactoryBean<String, String> producer =
new ProducerFactoryBean<String, String>(producerMetadata, kafkaRule.getBrokersAsString(), props);
ProducerConfiguration<String, String> config =
new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(TOPIC, config));
return kafkaProducerContext;
}
并且不要忘记在 @Configuration
旁边添加 @EnableIntegration
。
未来:Spring 中的任何 XML 标签都会被一些 NamespaceHandler
解析,例如在本例中为 KafkaNamespaceHandler
。阅读它的源代码我们可以找到这些行:
registerBeanDefinitionParser("outbound-channel-adapter", new KafkaOutboundChannelAdapterParser());
registerBeanDefinitionParser("producer-context", new KafkaProducerContextParser());
当我们转到 KafkaOutboundChannelAdapterParser
并看到它填充了一个 BeanDefinition
:
final BeanDefinitionBuilder kafkaProducerMessageHandlerBuilder =
BeanDefinitionBuilder.genericBeanDefinition(KafkaProducerMessageHandler.class);
源代码依此类推
更新 2
Consumer
部分:
@Bean
@InboundChannelAdapter(value = "fromKafkaChannel",
poller = @Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
public MessageSource<Map<String, Map<Integer, List<Object>>>> kafkaMessageSource() {
return new KafkaHighLevelConsumerMessageSource<String, String>();
}
@Bean
public KafkaConsumerContext<String, String> kafkaConsumerContext() {
KafkaConsumerContext<String, String> kafkaConsumerContext = new KafkaConsumerContext<String, String>();
.....
kafkaConsumerContext.setConsumerConfigurations(map);
return kafkaConsumerContext;
}