如何将 R2dbcMessageSource 的结果转换为从查询结果派生的新消息?
How do I transform the result of R2dbcMessageSource into a new Message derived from the query result?
我正在使用 R2dbcMessageSource
查询一个项目的 table 并希望使用其中一个列来创建消息以发送到消息处理程序。
查询的结果是Message<Mono<Event>>
,这很合理。我想 event.getDetails
并创建一条消息。
使用 DirectChannel
和 Transformer
,我试过这样的方法
@Bean
@Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer")
public GenericTransformer<Message<Mono<Event>>, Message<String>> monoToString() {
return message -> {
Mono<Event> payload = message.getPayload();
final String details = payload.block();
if (details == null) {
return null;
}
return MessageBuilder
.withPayload(details)
.setHeader("foo", "baz")
.build();
};
}
当然,我需要将它放在它自己的执行程序上以避免占用线程,但无论如何它都不起作用 - 它抛出一个 class 转换异常。
error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1651b34e]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoOnAssembly cannot be cast to org.springframework.messaging.Message, failedMessage=GenericMessage [payload=Mono.flatMap
这似乎是因为 Transformer
是在程序集上计算的。
使用 FluxMessageChannel
似乎很奇怪,因为源本身不是反应式的,但它允许我使用服务激活器转换有效负载。
@Bean
@ServiceActivator(inputChannel = "fromR2dbcChannel", reactive = @Reactive("publishMessageWithString"))
public MessageHandler toValidator() {
return message -> validationChannel().send(message);
}
@Bean
public Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> publishMessageWithString() {
return flux -> flux
.as(toMessageWithString());
}
private Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> toMessageWithString() {
return messageFlux -> messageFlux.map(message -> {
final Mono<Event> payload = message.getPayload();
if (payload != null) {
final Event event = payload.block();
if (event != null) {
return MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build();
}
}
return null;
});
}
在现实生活中,附加到服务激活器的消息处理程序本身并不理解流量。这是一个 KinesisMessageHandler
.
虽然这似乎有效,但感觉很尴尬。有没有更好的方法来改造它对反应堆友好?或者订阅和调用处理程序的适配器?首尾相传Message<Mono<Event>>
似乎很合理,所以后者未必合适。谢谢!
更新
感谢 Artem,工作测试如下:
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.annotation.Id;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.r2dbc.dialect.H2Dialect;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.r2dbc.inbound.R2dbcMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTest {
@Autowired
DatabaseClient client;
R2dbcEntityTemplate entityTemplate;
@Autowired
QueueChannel validationChannel;
@Autowired
FluxMessageChannel fluxChannel;
@BeforeEach
public void setup() {
this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);
List<String> statements =
Arrays.asList(
"DROP TABLE IF EXISTS events;",
"CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");
statements.forEach(it -> this.client.sql(it)
.fetch()
.rowsUpdated()
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete());
}
@Test
public void validateSuccessfulIntegrationFlow() throws InterruptedException {
this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
.then()
.as(StepVerifier::create)
.verifyComplete();
// Validate string
final Message<?> message = validationChannel.receive();
assertThat(message.getPayload()).isEqualTo("Event details");
assertThat(message.getHeaders()).containsKey("foo");
}
@Import(R2dbcDatabaseConfiguration.class)
@Configuration
@EnableIntegration
static class SpringIntegrationConfiguration {
@Autowired
R2dbcEntityTemplate r2dbcEntityTemplate;
@Bean
FluxMessageChannel fromR2dbcChannel() {
return new FluxMessageChannel();
}
@BridgeTo(value = "validationChannel")
@Bean
FluxMessageChannel fluxChannel() {
return new FluxMessageChannel();
}
@Bean
QueueChannel validationChannel() {
return new QueueChannel();
}
@ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "fluxChannel", async = "true")
public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
return eventMono.map(event ->
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
// Cron expression is only here because Spring environment is fully initialized before test
// creates table, so wait for the test to start.
@Bean
@InboundChannelAdapter(value = "fromR2dbcChannel", poller = @Poller(cron = "30/2 * * * * *"))
public R2dbcMessageSource r2dbcMessageSourceSelectOne(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1");
r2dbcMessageSource.setPayloadType(Event.class);
r2dbcMessageSource.setExpectSingleResult(true);
return r2dbcMessageSource;
}
}
@Configuration
@EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return createConnectionFactory();
}
public ConnectionFactory createConnectionFactory() {
return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
.inMemory("r2dbc")
.username("sa")
.password("")
.option("DB_CLOSE_DELAY=-1").build());
}
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.create(connectionFactory);
}
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
}
}
@Table("events")
@Getter
@Setter
@RequiredArgsConstructor
static class Event {
@Id
private Integer id;
@NonNull
public Instant timestamp;
@NonNull
public String details;
}
}
我建议重新考虑您对反应流的看法,并开始避免手动调用 .block()
。
考虑到您的整个流程需求,将 fromR2dbcChannel
设为 FluxMessageChannel
确实更好,因此您来自 R2DBC 的 Mono
将在内部顺利订阅和处理有数据的话框架就可以了
您的 @Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer")
然后可以只处理一个普通的 Event
作为输入参数。然后你的 KinesisMessageHandler
可以很好地处理你从 event.getDetails()
.
发送到 palyoad 中的输入通道的任何内容
更新
所以,我的错。独立于 @InboundChannelAdapter
的通道,它仍然会在消息的有效负载中生成 Mono
。从这里开始,频道类型真的无关紧要。 |但同时您可以将 validationChannel
设为 FluxMessageChannel
然后您的变压器必须更改为服务激活器:
@ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "validationChannel", async = "true")
public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
return eventMono.map(
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
这样 Mono
将被 FluxMessageChannel
订阅,其余的“阻塞”流应该保持不变。
如果 POJO 方法的结果不是 Message
。
,它会 return Message
的转换器问题
我正在使用 R2dbcMessageSource
查询一个项目的 table 并希望使用其中一个列来创建消息以发送到消息处理程序。
查询的结果是Message<Mono<Event>>
,这很合理。我想 event.getDetails
并创建一条消息。
使用 DirectChannel
和 Transformer
,我试过这样的方法
@Bean
@Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer")
public GenericTransformer<Message<Mono<Event>>, Message<String>> monoToString() {
return message -> {
Mono<Event> payload = message.getPayload();
final String details = payload.block();
if (details == null) {
return null;
}
return MessageBuilder
.withPayload(details)
.setHeader("foo", "baz")
.build();
};
}
当然,我需要将它放在它自己的执行程序上以避免占用线程,但无论如何它都不起作用 - 它抛出一个 class 转换异常。
error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1651b34e]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoOnAssembly cannot be cast to org.springframework.messaging.Message, failedMessage=GenericMessage [payload=Mono.flatMap
这似乎是因为 Transformer
是在程序集上计算的。
使用 FluxMessageChannel
似乎很奇怪,因为源本身不是反应式的,但它允许我使用服务激活器转换有效负载。
@Bean
@ServiceActivator(inputChannel = "fromR2dbcChannel", reactive = @Reactive("publishMessageWithString"))
public MessageHandler toValidator() {
return message -> validationChannel().send(message);
}
@Bean
public Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> publishMessageWithString() {
return flux -> flux
.as(toMessageWithString());
}
private Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> toMessageWithString() {
return messageFlux -> messageFlux.map(message -> {
final Mono<Event> payload = message.getPayload();
if (payload != null) {
final Event event = payload.block();
if (event != null) {
return MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build();
}
}
return null;
});
}
在现实生活中,附加到服务激活器的消息处理程序本身并不理解流量。这是一个 KinesisMessageHandler
.
虽然这似乎有效,但感觉很尴尬。有没有更好的方法来改造它对反应堆友好?或者订阅和调用处理程序的适配器?首尾相传Message<Mono<Event>>
似乎很合理,所以后者未必合适。谢谢!
更新
感谢 Artem,工作测试如下:
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.annotation.Id;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.r2dbc.dialect.H2Dialect;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.r2dbc.inbound.R2dbcMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTest {
@Autowired
DatabaseClient client;
R2dbcEntityTemplate entityTemplate;
@Autowired
QueueChannel validationChannel;
@Autowired
FluxMessageChannel fluxChannel;
@BeforeEach
public void setup() {
this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);
List<String> statements =
Arrays.asList(
"DROP TABLE IF EXISTS events;",
"CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");
statements.forEach(it -> this.client.sql(it)
.fetch()
.rowsUpdated()
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete());
}
@Test
public void validateSuccessfulIntegrationFlow() throws InterruptedException {
this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
.then()
.as(StepVerifier::create)
.verifyComplete();
// Validate string
final Message<?> message = validationChannel.receive();
assertThat(message.getPayload()).isEqualTo("Event details");
assertThat(message.getHeaders()).containsKey("foo");
}
@Import(R2dbcDatabaseConfiguration.class)
@Configuration
@EnableIntegration
static class SpringIntegrationConfiguration {
@Autowired
R2dbcEntityTemplate r2dbcEntityTemplate;
@Bean
FluxMessageChannel fromR2dbcChannel() {
return new FluxMessageChannel();
}
@BridgeTo(value = "validationChannel")
@Bean
FluxMessageChannel fluxChannel() {
return new FluxMessageChannel();
}
@Bean
QueueChannel validationChannel() {
return new QueueChannel();
}
@ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "fluxChannel", async = "true")
public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
return eventMono.map(event ->
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
// Cron expression is only here because Spring environment is fully initialized before test
// creates table, so wait for the test to start.
@Bean
@InboundChannelAdapter(value = "fromR2dbcChannel", poller = @Poller(cron = "30/2 * * * * *"))
public R2dbcMessageSource r2dbcMessageSourceSelectOne(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1");
r2dbcMessageSource.setPayloadType(Event.class);
r2dbcMessageSource.setExpectSingleResult(true);
return r2dbcMessageSource;
}
}
@Configuration
@EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return createConnectionFactory();
}
public ConnectionFactory createConnectionFactory() {
return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
.inMemory("r2dbc")
.username("sa")
.password("")
.option("DB_CLOSE_DELAY=-1").build());
}
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.create(connectionFactory);
}
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
}
}
@Table("events")
@Getter
@Setter
@RequiredArgsConstructor
static class Event {
@Id
private Integer id;
@NonNull
public Instant timestamp;
@NonNull
public String details;
}
}
我建议重新考虑您对反应流的看法,并开始避免手动调用 .block()
。
考虑到您的整个流程需求,将 fromR2dbcChannel
设为 FluxMessageChannel
确实更好,因此您来自 R2DBC 的 Mono
将在内部顺利订阅和处理有数据的话框架就可以了
您的 @Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer")
然后可以只处理一个普通的 Event
作为输入参数。然后你的 KinesisMessageHandler
可以很好地处理你从 event.getDetails()
.
更新
所以,我的错。独立于 @InboundChannelAdapter
的通道,它仍然会在消息的有效负载中生成 Mono
。从这里开始,频道类型真的无关紧要。 |但同时您可以将 validationChannel
设为 FluxMessageChannel
然后您的变压器必须更改为服务激活器:
@ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "validationChannel", async = "true")
public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
return eventMono.map(
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
这样 Mono
将被 FluxMessageChannel
订阅,其余的“阻塞”流应该保持不变。
如果 POJO 方法的结果不是 Message
。
Message
的转换器问题