Spring 云流 - 路由到多个目的地 streamBridge
Spring Cloud stream - Routing to multiple destinations streamBridge
我有 3 个服务,“service-1”根据传递给负载的 id 值动态路由到其他 2 个服务。
但支付处理器总是路由到主题 -out-0 "spring.cloud.stream.function.bindings.processor-out-0"
service-3 永远不会被调用。
尝试使用 spring.cloud.stream.sendto.destination,但它仍然服务 -3 永远不会被路由
服务 1
@Bean
public Function<Flux<String>, Flux<Object>> processor() {
return messageFlux -> messageFlux.flatMap(
stringMessage -> {
try {
payload = jsonMapper.readValue(stringMessage, Map.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
logger.info("Payment payload :" + payload.get("id"));
if(payload.get("id").equalsIgnoreCase("e-payment"))
streamBridge.send("processor-out-0",stringMessage);
else
streamBridge.send("processor-out-1",stringMessage);
return Mono.just(stringMessage);
});
}
@Bean
public Consumer<String> consumer() {
return data -> logger.info("Payment Response :" + data);
}
服务 1 application.properties
server.port = 9000
spring.application.name=payment-process
spring.cloud.function.definition=supplier;processor;consumer
spring.cloud.stream.source=processorCheck;processorEPayment
#bindings for payment processor
spring.cloud.stream.function.bindings.processor-out-0=e-payment
spring.cloud.stream.function.bindings.processor-out-1=check-payment
spring.cloud.stream.function.bindings.processor-in-0=payment-process
#bindings for response for payment processor
spring.cloud.stream.function.bindings.consumer-in-0=payment-response
#grouping
spring.cloud.stream.bindings.processor-in-0.group=check-service
spring.cloud.stream.bindings.processor-out-0.group=check-service
servie-2
@Bean
public Function<String,String> processorCheck() {
return
data ->{
logger.info("Data received from :" + data);
return "Check Payment Successful";
};
}
服务 2 application.properties
server.port = 9002
spring.application.name=check-service
spring.cloud.function.definition=processorCheck
spring.cloud.stream.source=processorCheck
spring.cloud.stream.bindings.processorCheck-in-0.destination=check-payment
spring.cloud.stream.bindings.processorCheck-out-0.destination=payment-response
spring.cloud.stream.bindings.processorCheck-in-0.group=check-service
spring.cloud.stream.bindings.processorCheck-out-0.group=check-service
spring.cloud.stream.function.routing.enabled=true
消费者服务-3
@Bean
public Function<String,String> processorEPayment() {
return
data ->{
logger.info("Data received from :" + data);
return "E-Payment Successful";
};
}
服务 3 application.properties
server.port = 9001
spring.application.name=e-payment-service
spring.cloud.function.definition=processorEPayment
#input to e-payment service topic
spring.cloud.stream.bindings.processorEPayment-in-0.destination=e-payment
spring.cloud.stream.source=processorEPayment
#output response to e-payment topic
spring.cloud.stream.bindings.processorEPayment-out-0.destination=payment-response
spring.cloud.stream.bindings.processorEPayment-in-0.group=e-payment-service
spring.cloud.stream.bindings.processorEPayment-out-0.group=e-payment-service
spring.cloud.stream.function.routing.enabled=true
在您的服务 1 中,在第一个 StreamBridge
发送中,您可以这样做 - streamBridge.send("e-payment",stringMessage);
第二个 StreamBridge
发送调用变为 - streamBridge.send("check-payment",stringMessage);
他们将是分别发送到主题 e-payment
和 check-payment
。
删除 service-1 中的这两个属性:(它们是不必要的,可能会改变绑定的语义)。
spring.cloud.stream.function.bindings.processor-out-0=e-payment
spring.cloud.stream.function.bindings.processor-out-1=check-payment
您需要为输出绑定添加目标。
spring.cloud.stream.bindings.processor-out-0.destination=payment-response
消费者随后将从该主题消费(因为您更改了消费者的绑定以反映该主题)。
我有 3 个服务,“service-1”根据传递给负载的 id 值动态路由到其他 2 个服务。
但支付处理器总是路由到主题 -out-0 "spring.cloud.stream.function.bindings.processor-out-0" service-3 永远不会被调用。
尝试使用 spring.cloud.stream.sendto.destination,但它仍然服务 -3 永远不会被路由
服务 1
@Bean
public Function<Flux<String>, Flux<Object>> processor() {
return messageFlux -> messageFlux.flatMap(
stringMessage -> {
try {
payload = jsonMapper.readValue(stringMessage, Map.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
logger.info("Payment payload :" + payload.get("id"));
if(payload.get("id").equalsIgnoreCase("e-payment"))
streamBridge.send("processor-out-0",stringMessage);
else
streamBridge.send("processor-out-1",stringMessage);
return Mono.just(stringMessage);
});
}
@Bean
public Consumer<String> consumer() {
return data -> logger.info("Payment Response :" + data);
}
服务 1 application.properties
server.port = 9000
spring.application.name=payment-process
spring.cloud.function.definition=supplier;processor;consumer
spring.cloud.stream.source=processorCheck;processorEPayment
#bindings for payment processor
spring.cloud.stream.function.bindings.processor-out-0=e-payment
spring.cloud.stream.function.bindings.processor-out-1=check-payment
spring.cloud.stream.function.bindings.processor-in-0=payment-process
#bindings for response for payment processor
spring.cloud.stream.function.bindings.consumer-in-0=payment-response
#grouping
spring.cloud.stream.bindings.processor-in-0.group=check-service
spring.cloud.stream.bindings.processor-out-0.group=check-service
servie-2
@Bean
public Function<String,String> processorCheck() {
return
data ->{
logger.info("Data received from :" + data);
return "Check Payment Successful";
};
}
服务 2 application.properties
server.port = 9002
spring.application.name=check-service
spring.cloud.function.definition=processorCheck
spring.cloud.stream.source=processorCheck
spring.cloud.stream.bindings.processorCheck-in-0.destination=check-payment
spring.cloud.stream.bindings.processorCheck-out-0.destination=payment-response
spring.cloud.stream.bindings.processorCheck-in-0.group=check-service
spring.cloud.stream.bindings.processorCheck-out-0.group=check-service
spring.cloud.stream.function.routing.enabled=true
消费者服务-3
@Bean
public Function<String,String> processorEPayment() {
return
data ->{
logger.info("Data received from :" + data);
return "E-Payment Successful";
};
}
服务 3 application.properties
server.port = 9001
spring.application.name=e-payment-service
spring.cloud.function.definition=processorEPayment
#input to e-payment service topic
spring.cloud.stream.bindings.processorEPayment-in-0.destination=e-payment
spring.cloud.stream.source=processorEPayment
#output response to e-payment topic
spring.cloud.stream.bindings.processorEPayment-out-0.destination=payment-response
spring.cloud.stream.bindings.processorEPayment-in-0.group=e-payment-service
spring.cloud.stream.bindings.processorEPayment-out-0.group=e-payment-service
spring.cloud.stream.function.routing.enabled=true
在您的服务 1 中,在第一个 StreamBridge
发送中,您可以这样做 - streamBridge.send("e-payment",stringMessage);
第二个 StreamBridge
发送调用变为 - streamBridge.send("check-payment",stringMessage);
他们将是分别发送到主题 e-payment
和 check-payment
。
删除 service-1 中的这两个属性:(它们是不必要的,可能会改变绑定的语义)。
spring.cloud.stream.function.bindings.processor-out-0=e-payment
spring.cloud.stream.function.bindings.processor-out-1=check-payment
您需要为输出绑定添加目标。
spring.cloud.stream.bindings.processor-out-0.destination=payment-response
消费者随后将从该主题消费(因为您更改了消费者的绑定以反映该主题)。