spring 集成 java dsl - java 7 - 在出站通道适配器中设置轮询器
spring integration java dsl - java 7 - set poller in outbound channel adapter
我正在尝试将我的一些代码从 XML 迁移到 java dsl 样式(pre-java8)。
这是我创建的 java 配置,我不知道如何设置轮询器。这些示例仅讨论全局轮询器,但我需要在适配器中设置轮询器。
@Bean
public MessageHandler kafkaMessageHandler() {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setMessageKeyExpression(new LiteralExpression("kafka-integration"));
handler.setTopicExpression(new LiteralExpression("headers.kafka_topic"));
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(producerConfigs()));
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
// introduce a delay on the send to allow more messages to accumulate
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
return properties;
}
我拥有的 XML 等效项如下:
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="kafkaChannel" >
<int:poller fixed-rate="1000" max-messages-per-poll="10000}"/>
</int-kafka:outbound-channel-adapter>
参见the documentation...
@Bean
@ServiceActivator(inputChannel = "kafkaChannel" poller = @Poller(fixedDelay = "1000", ...)
public MessageHandler kafkaMessageHandler() {
...
}
我正在尝试将我的一些代码从 XML 迁移到 java dsl 样式(pre-java8)。
这是我创建的 java 配置,我不知道如何设置轮询器。这些示例仅讨论全局轮询器,但我需要在适配器中设置轮询器。
@Bean
public MessageHandler kafkaMessageHandler() {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setMessageKeyExpression(new LiteralExpression("kafka-integration"));
handler.setTopicExpression(new LiteralExpression("headers.kafka_topic"));
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, String>(producerConfigs()));
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
// introduce a delay on the send to allow more messages to accumulate
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
return properties;
}
我拥有的 XML 等效项如下:
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="kafkaChannel" >
<int:poller fixed-rate="1000" max-messages-per-poll="10000}"/>
</int-kafka:outbound-channel-adapter>
参见the documentation...
@Bean
@ServiceActivator(inputChannel = "kafkaChannel" poller = @Poller(fixedDelay = "1000", ...)
public MessageHandler kafkaMessageHandler() {
...
}