Spring 不同消费者的每条消息的 Cloud Stream 主题(在一个 Comsumer 应用程序中)
Spring Cloud Stream topic per message for different consumers (in one Comsumer app)
这个问题类似于,但不同的是我想在一个消费者spring引导应用程序中使用多个接收器,我想通过rabbitmq主题(默认情况下在spring云流)。我无法找出正确的配置或代码中的错误。我有 3 sinks/cosumers。 consumer1 是默认值,每条消息都会发送到那里。
**根据 Garry 的建议更新**
评论:我的 Producer App 有路由键='*.events'
application.yml
spring:
cloud:
stream:
bindings:
output:
destination: my-exchange
rabbit:
bindings:
output:
producer:
routing-key-expression: headers['*.events']
application:
name: publisher-service
server:
port: 15010
生产者代码片段
Comment:message 与路由键 ="test.events" 一起发送。我不确定第二个参数,但我假设它是 bindingrouting-key =test1.events.billing 这意味着除了默认消费者之外,我希望它被交付给计费消费者。
source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
.setHeader("*.events", "test1.events.billing")
.build());
消费者配置
评论:我想要 3 个队列分配给 exchange ="myexchange" 。我不确定配置是否正确。
application.yml
spring:
cloud:
stream:
bindings:
defaultconsumer:
destination: my-exchange
group: queue1
billingconsumer:
destination: my-exchange
group: queue2
messageconsumer:
destination: my-exchange
group: queue3
rabbit:
bindings:
defaultconsumer:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer:
consumer:
bindingRoutingKey: test2.events.messages
application:
name: subscriber-service
server:
port: 15020
消费者代码:
IEventConsumer.java
评论:我不确定下面的代码是否正确
public interface IEventConsumer {
String INPUT = "my-exchange";
@Input
SubscribableChannel defaultconsumer();
@Input
SubscribableChannel billingconsumer();
@Input
SubscribableChannel messageconsumer();
}
EventConsumer.java
评论:我想要的只是下面的消息不应该收到我的 messsageConsumer!
但实际上它通过所有这些方法。
@StreamListener("defaultconsumer")
public void subscribe1(EventMessage eventMessage) {
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("billingconsumer")
public void subscribe2(EventMessage eventMessage) {
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("messageconsumer")
public void subscribe3(EventMessage eventMessage) {
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
上面显然有问题,我看不到这个工作。有什么想法吗?
@Input(INPUT)
SubscribableChannel defaultconsumer();
@Input(INPUT)
SubscribableChannel billingconsumer();
@Input(INPUT)
SubscribableChannel messageconsumer();
您为所有三个绑定指定了相同的名称;只需使用 @INPUT
,方法名称将用作绑定名称。
和
@StreamListener("defaultconsumer")
等等
编辑
我刚刚复制了你的代码并且运行良好...
@SpringBootApplication
@EnableBinding({ IEventConsumer.class, Source.class })
public class So60879187Application {
private static final Logger logger = LoggerFactory.getLogger(So60879187Application.class);
public static void main(String[] args) {
SpringApplication.run(So60879187Application.class, args);
}
@StreamListener("defaultconsumer")
public void subscribe1(String eventMessage) {
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("billingconsumer")
public void subscribe2(String eventMessage) {
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("messageconsumer")
public void subscribe3(String eventMessage) {
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> output.send(MessageBuilder.withPayload("foo")
.setHeader("*.events", "test1.events.billing")
.build());
}
}
interface IEventConsumer {
String INPUT = "my-exchange";
@Input
SubscribableChannel defaultconsumer();
@Input
SubscribableChannel billingconsumer();
@Input
SubscribableChannel messageconsumer();
}
spring:
cloud:
stream:
bindings:
defaultconsumer:
destination: my-exchange
group: queue1
billingconsumer:
destination: my-exchange
group: queue2
messageconsumer:
destination: my-exchange
group: queue3
output:
destination: my-exchange
rabbit:
bindings:
defaultconsumer:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer:
consumer:
bindingRoutingKey: test2.events.messages
output:
producer:
routing-key-expression: headers['*.events']
application:
name: subscriber-service
server:
port: 15020
和
2020-03-27 09:45:33.607 INFO 30366 --- [change.queue1-1] com.example.demo.So60879187Application
: DefaultEventConsumer received new event [foo]
2020-03-27 09:45:33.607 INFO 30366 --- [change.queue2-1] com.example.demo.So60879187Application
: billingEventConsumer received new event [foo]
EDIT2
更新的函数式编程模型等效...
@SpringBootApplication
public class So608791871Application {
private static final Logger logger = LoggerFactory.getLogger(So608791871Application.class);
public static void main(String[] args) {
SpringApplication.run(So608791871Application.class, args);
}
@Bean
public Consumer<String> defaultconsumer() {
return eventMessage ->
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public Consumer<String> billingconsumer() {
return eventMessage ->
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public Consumer<String> messageconsumer() {
return eventMessage ->
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
private final DirectProcessor<Message<?>> output = DirectProcessor.create();
@Bean
public Supplier<Flux<Message<?>>> output() {
return () -> this.output;
}
@Bean
public ApplicationRunner runner() {
Message<String> msg1 = MessageBuilder.withPayload("foo")
.setHeader("*.events", "test1.events.billing")
.build();
Message<String> msg2 = MessageBuilder.withPayload("bar")
.setHeader("*.events", "test2.events.messages")
.build();
return args -> {
this.output.onNext(msg1);
this.output.onNext(msg2);
};
}
}
spring:
cloud:
function:
definition: defaultconsumer;billingconsumer;messageconsumer;output
stream:
bindings:
defaultconsumer-in-0:
destination: my-exchange
group: queue1
billingconsumer-in-0:
destination: my-exchange
group: queue2
messageconsumer-in-0:
destination: my-exchange
group: queue3
output-out-0:
destination: my-exchange
rabbit:
bindings:
defaultconsumer-in-0:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer-in-0:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer-in-0:
consumer:
bindingRoutingKey: test2.events.messages
output-out-0:
producer:
routing-key-expression: headers['*.events']
application:
name: subscriber-service
server:
port: 15020
和
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue3-1] com.example.demo.So608791871Application
: messageEventConsumer received new event [bar]
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
: DefaultEventConsumer received new event [foo]
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue2-1] com.example.demo.So608791871Application
: billingEventConsumer received new event [foo]
2020-03-27 14:28:37.429 INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
: DefaultEventConsumer received new event [bar]
这个问题类似于
**根据 Garry 的建议更新**
评论:我的 Producer App 有路由键='*.events' application.yml
spring:
cloud:
stream:
bindings:
output:
destination: my-exchange
rabbit:
bindings:
output:
producer:
routing-key-expression: headers['*.events']
application:
name: publisher-service
server:
port: 15010
生产者代码片段 Comment:message 与路由键 ="test.events" 一起发送。我不确定第二个参数,但我假设它是 bindingrouting-key =test1.events.billing 这意味着除了默认消费者之外,我希望它被交付给计费消费者。
source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
.setHeader("*.events", "test1.events.billing")
.build());
消费者配置 评论:我想要 3 个队列分配给 exchange ="myexchange" 。我不确定配置是否正确。 application.yml
spring:
cloud:
stream:
bindings:
defaultconsumer:
destination: my-exchange
group: queue1
billingconsumer:
destination: my-exchange
group: queue2
messageconsumer:
destination: my-exchange
group: queue3
rabbit:
bindings:
defaultconsumer:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer:
consumer:
bindingRoutingKey: test2.events.messages
application:
name: subscriber-service
server:
port: 15020
消费者代码: IEventConsumer.java 评论:我不确定下面的代码是否正确
public interface IEventConsumer {
String INPUT = "my-exchange";
@Input
SubscribableChannel defaultconsumer();
@Input
SubscribableChannel billingconsumer();
@Input
SubscribableChannel messageconsumer();
}
EventConsumer.java 评论:我想要的只是下面的消息不应该收到我的 messsageConsumer! 但实际上它通过所有这些方法。
@StreamListener("defaultconsumer")
public void subscribe1(EventMessage eventMessage) {
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("billingconsumer")
public void subscribe2(EventMessage eventMessage) {
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("messageconsumer")
public void subscribe3(EventMessage eventMessage) {
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
上面显然有问题,我看不到这个工作。有什么想法吗?
@Input(INPUT)
SubscribableChannel defaultconsumer();
@Input(INPUT)
SubscribableChannel billingconsumer();
@Input(INPUT)
SubscribableChannel messageconsumer();
您为所有三个绑定指定了相同的名称;只需使用 @INPUT
,方法名称将用作绑定名称。
和
@StreamListener("defaultconsumer")
等等
编辑
我刚刚复制了你的代码并且运行良好...
@SpringBootApplication
@EnableBinding({ IEventConsumer.class, Source.class })
public class So60879187Application {
private static final Logger logger = LoggerFactory.getLogger(So60879187Application.class);
public static void main(String[] args) {
SpringApplication.run(So60879187Application.class, args);
}
@StreamListener("defaultconsumer")
public void subscribe1(String eventMessage) {
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("billingconsumer")
public void subscribe2(String eventMessage) {
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("messageconsumer")
public void subscribe3(String eventMessage) {
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> output.send(MessageBuilder.withPayload("foo")
.setHeader("*.events", "test1.events.billing")
.build());
}
}
interface IEventConsumer {
String INPUT = "my-exchange";
@Input
SubscribableChannel defaultconsumer();
@Input
SubscribableChannel billingconsumer();
@Input
SubscribableChannel messageconsumer();
}
spring:
cloud:
stream:
bindings:
defaultconsumer:
destination: my-exchange
group: queue1
billingconsumer:
destination: my-exchange
group: queue2
messageconsumer:
destination: my-exchange
group: queue3
output:
destination: my-exchange
rabbit:
bindings:
defaultconsumer:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer:
consumer:
bindingRoutingKey: test2.events.messages
output:
producer:
routing-key-expression: headers['*.events']
application:
name: subscriber-service
server:
port: 15020
和
2020-03-27 09:45:33.607 INFO 30366 --- [change.queue1-1] com.example.demo.So60879187Application
: DefaultEventConsumer received new event [foo]
2020-03-27 09:45:33.607 INFO 30366 --- [change.queue2-1] com.example.demo.So60879187Application
: billingEventConsumer received new event [foo]
EDIT2
更新的函数式编程模型等效...
@SpringBootApplication
public class So608791871Application {
private static final Logger logger = LoggerFactory.getLogger(So608791871Application.class);
public static void main(String[] args) {
SpringApplication.run(So608791871Application.class, args);
}
@Bean
public Consumer<String> defaultconsumer() {
return eventMessage ->
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public Consumer<String> billingconsumer() {
return eventMessage ->
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@Bean
public Consumer<String> messageconsumer() {
return eventMessage ->
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
private final DirectProcessor<Message<?>> output = DirectProcessor.create();
@Bean
public Supplier<Flux<Message<?>>> output() {
return () -> this.output;
}
@Bean
public ApplicationRunner runner() {
Message<String> msg1 = MessageBuilder.withPayload("foo")
.setHeader("*.events", "test1.events.billing")
.build();
Message<String> msg2 = MessageBuilder.withPayload("bar")
.setHeader("*.events", "test2.events.messages")
.build();
return args -> {
this.output.onNext(msg1);
this.output.onNext(msg2);
};
}
}
spring:
cloud:
function:
definition: defaultconsumer;billingconsumer;messageconsumer;output
stream:
bindings:
defaultconsumer-in-0:
destination: my-exchange
group: queue1
billingconsumer-in-0:
destination: my-exchange
group: queue2
messageconsumer-in-0:
destination: my-exchange
group: queue3
output-out-0:
destination: my-exchange
rabbit:
bindings:
defaultconsumer-in-0:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer-in-0:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer-in-0:
consumer:
bindingRoutingKey: test2.events.messages
output-out-0:
producer:
routing-key-expression: headers['*.events']
application:
name: subscriber-service
server:
port: 15020
和
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue3-1] com.example.demo.So608791871Application
: messageEventConsumer received new event [bar]
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
: DefaultEventConsumer received new event [foo]
2020-03-27 14:28:37.426 INFO 3646 --- [change.queue2-1] com.example.demo.So608791871Application
: billingEventConsumer received new event [foo]
2020-03-27 14:28:37.429 INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
: DefaultEventConsumer received new event [bar]