如何通过 spring 集成向 kafka 通道和 jdbc 发送消息?
how to send message to both kafka channel and jdbc with spring integration?
我们正在尝试从 jdbc 源进行轮询,聚合消息,然后将聚合器的输出发送到 kafka 流,然后发送到 jdbcMessageHandler(更新我们轮询的行,这样它们就不会被再次轮询。它实际上是一个单独的 table)。我们正在使用 IntegrationFlow DSL。轮询和聚合工作正常。多个 outputs/sources/channels/handlers 的输出是无效的。
我们目前的流程是这样的:
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(), new Consumer<SourcePollingChannelAdapterSpec>(){
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec.poller(poller);
}
})
.split()
.transform((message)-> {
// do our stuff; output is a simple POJO representing a single row from the db
})
.aggregate(aggregator -> aggregator) // details spared but it works fine; output is a larger POJO containing a collection of the row objects
.channel(this.source.output()) // this goes to kafka
.handle(jdbcMessageHandler()); // doesn't get here
这是我们的投票来源:
@Bean
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
jdbcPollingChannelAdapter.setSelectSqlParameterSource(createSqlParameterSource());
return jdbcPollingChannelAdapter;
}
jdbcMessageHandler 就是这样:
@Bean
public MessageHandler jdbcMessageHandler() {
return new JdbcMessageHandler(dataSource, this.properties.getUpdate());
}
this.output 就是这样:
@Autowired
private Source source;
它通过@EnableBinding(Source.class) 映射到我们 application.yml:
中的一个 kafka 主题
spring:
cloud:
stream:
bindings:
output: our.topic
我们的大部分属性都在 application.yml 文件中定义,我们使用这些属性和注释而不是 XMl 配置。
通过上述,它可以很好地写入 kafka,但不会到达 jdbcMessageHandler。
我在执行此操作时工作(在聚合器之后):
.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
.subscribe(flow -> flow
.handle(jdbcMessageHandler()))
)
.channel(this.source.output())
但这是错误的顺序;我们要确保首先将消息写入 kafka,然后更新 table 以跟踪已成功轮询的行。
jdbcMessageHandler 只是包装了一个使用消息中的值的 INSERT 语句,所以我假设有多种方法可以做到这一点。 jdbcOutboundGateway 是一种方式吗?看起来这是为了执行另一个查询和 return 一个要进一步处理的结果,这不符合我们的用例。
还建议使用轮询来更新,因为我们使用轮询作为我们的来源。我调查了这个。我不认为这也行得通,因为它似乎在轮询之后立即进行更新,甚至在轮询结果被处理之前进行更新,所以在最终聚合消息发送到 kafka 之前进行更新的时间也有同样的问题.
编辑:我尝试了下面的答案,所以当前流程是这样的:
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(), new Consumer<SourcePollingChannelAdapterSpec>(){
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec.poller(poller);
}
})
.split()
.transform((message)-> {
// transform ResultSet; output is a simple POJO representing a single row from the db
})
.aggregate(aggregator -> aggregator) // details spared but it works fine; output is a larger POJO containing a collection of the row objects
.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
.subscribe(flow -> flow
.handle(this.source.output()))
.subscribe(flow -> flow
.handle(jdbcMessageHandler()))
);
}
启动时出现以下错误:
java.lang.IllegalArgumentException: Found ambiguous parameter type [interface org.springframework.messaging.MessageHandler] for method match: [public boolean org.springframework.integration.channel.AbstractSubscribableChannel.unsubscribe(org.springframework.messaging.MessageHandler), public void org.springframework.integration.channel.AbstractMessageChannel.configureMetrics(org.springframework.integration.support.management.AbstractMessageChannelMetrics), public void org.springframework.integration.channel.AbstractMessageChannel.setDatatypes(java.lang.Class[]), public org.springframework.messaging.support.ChannelInterceptor org.springframework.integration.channel.AbstractMessageChannel.removeInterceptor(int), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public boolean org.springframework.integration.channel.AbstractMessageChannel.removeInterceptor(org.springframework.messaging.support.ChannelInterceptor), public java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentName(), public void org.springframework.integration.channel.AbstractMessageChannel.setStatsEnabled(boolean), public void org.springframework.integration.channel.AbstractMessageChannel.setMessageConverter(org.springframework.messaging.converter.MessageConverter), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.channel.AbstractMessageChannel.setInterceptors(java.util.List), public final void org.springframework.integration.context.IntegrationObjectSupport.setPrimaryExpression(org.springframework.expression.Expression), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver)]
at org.springframework.util.Assert.isNull(Assert.java:113)
at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:499)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:226)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:149)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:144)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:60)
at org.springframework.integration.handler.ServiceActivatingHandler.<init>(ServiceActivatingHandler.java:37)
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:985)
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:964)
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:950)
我调试并进入 IntegrationFlowDefinition 上的 .handle() 方法,发现传递的对象以某种方式转换为 DirectChannel,我不明白这一点。
将两个子流订阅到 pub/sub 频道。
.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
.subscribe(flow -> flow
.bridge(e -> e.id("bTO"))
.channel(this.source.output())
.subscribe(flow -> flow
.handle(jdbcMessageHandler()))
)
您需要 .bridge()
将 pub/sub 通道连接到输出通道。
我们正在尝试从 jdbc 源进行轮询,聚合消息,然后将聚合器的输出发送到 kafka 流,然后发送到 jdbcMessageHandler(更新我们轮询的行,这样它们就不会被再次轮询。它实际上是一个单独的 table)。我们正在使用 IntegrationFlow DSL。轮询和聚合工作正常。多个 outputs/sources/channels/handlers 的输出是无效的。
我们目前的流程是这样的:
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(), new Consumer<SourcePollingChannelAdapterSpec>(){
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec.poller(poller);
}
})
.split()
.transform((message)-> {
// do our stuff; output is a simple POJO representing a single row from the db
})
.aggregate(aggregator -> aggregator) // details spared but it works fine; output is a larger POJO containing a collection of the row objects
.channel(this.source.output()) // this goes to kafka
.handle(jdbcMessageHandler()); // doesn't get here
这是我们的投票来源:
@Bean
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
jdbcPollingChannelAdapter.setSelectSqlParameterSource(createSqlParameterSource());
return jdbcPollingChannelAdapter;
}
jdbcMessageHandler 就是这样:
@Bean
public MessageHandler jdbcMessageHandler() {
return new JdbcMessageHandler(dataSource, this.properties.getUpdate());
}
this.output 就是这样:
@Autowired
private Source source;
它通过@EnableBinding(Source.class) 映射到我们 application.yml:
中的一个 kafka 主题spring:
cloud:
stream:
bindings:
output: our.topic
我们的大部分属性都在 application.yml 文件中定义,我们使用这些属性和注释而不是 XMl 配置。
通过上述,它可以很好地写入 kafka,但不会到达 jdbcMessageHandler。
我在执行此操作时工作(在聚合器之后):
.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
.subscribe(flow -> flow
.handle(jdbcMessageHandler()))
)
.channel(this.source.output())
但这是错误的顺序;我们要确保首先将消息写入 kafka,然后更新 table 以跟踪已成功轮询的行。
jdbcMessageHandler 只是包装了一个使用消息中的值的 INSERT 语句,所以我假设有多种方法可以做到这一点。 jdbcOutboundGateway 是一种方式吗?看起来这是为了执行另一个查询和 return 一个要进一步处理的结果,这不符合我们的用例。
还建议使用轮询来更新,因为我们使用轮询作为我们的来源。我调查了这个。我不认为这也行得通,因为它似乎在轮询之后立即进行更新,甚至在轮询结果被处理之前进行更新,所以在最终聚合消息发送到 kafka 之前进行更新的时间也有同样的问题.
编辑:我尝试了下面的答案,所以当前流程是这样的:
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(), new Consumer<SourcePollingChannelAdapterSpec>(){
@Override
public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
sourcePollingChannelAdapterSpec.poller(poller);
}
})
.split()
.transform((message)-> {
// transform ResultSet; output is a simple POJO representing a single row from the db
})
.aggregate(aggregator -> aggregator) // details spared but it works fine; output is a larger POJO containing a collection of the row objects
.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
.subscribe(flow -> flow
.handle(this.source.output()))
.subscribe(flow -> flow
.handle(jdbcMessageHandler()))
);
}
启动时出现以下错误:
java.lang.IllegalArgumentException: Found ambiguous parameter type [interface org.springframework.messaging.MessageHandler] for method match: [public boolean org.springframework.integration.channel.AbstractSubscribableChannel.unsubscribe(org.springframework.messaging.MessageHandler), public void org.springframework.integration.channel.AbstractMessageChannel.configureMetrics(org.springframework.integration.support.management.AbstractMessageChannelMetrics), public void org.springframework.integration.channel.AbstractMessageChannel.setDatatypes(java.lang.Class[]), public org.springframework.messaging.support.ChannelInterceptor org.springframework.integration.channel.AbstractMessageChannel.removeInterceptor(int), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public boolean org.springframework.integration.channel.AbstractMessageChannel.removeInterceptor(org.springframework.messaging.support.ChannelInterceptor), public java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentName(), public void org.springframework.integration.channel.AbstractMessageChannel.setStatsEnabled(boolean), public void org.springframework.integration.channel.AbstractMessageChannel.setMessageConverter(org.springframework.messaging.converter.MessageConverter), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.channel.AbstractMessageChannel.setInterceptors(java.util.List), public final void org.springframework.integration.context.IntegrationObjectSupport.setPrimaryExpression(org.springframework.expression.Expression), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver)]
at org.springframework.util.Assert.isNull(Assert.java:113)
at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:499)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:226)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:149)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:144)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:60)
at org.springframework.integration.handler.ServiceActivatingHandler.<init>(ServiceActivatingHandler.java:37)
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:985)
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:964)
at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:950)
我调试并进入 IntegrationFlowDefinition 上的 .handle() 方法,发现传递的对象以某种方式转换为 DirectChannel,我不明白这一点。
将两个子流订阅到 pub/sub 频道。
.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
.subscribe(flow -> flow
.bridge(e -> e.id("bTO"))
.channel(this.source.output())
.subscribe(flow -> flow
.handle(jdbcMessageHandler()))
)
您需要 .bridge()
将 pub/sub 通道连接到输出通道。