使用 spring 云流和函数路由 FROM 和路由 TO

route FROM and route TO with spring cloud stream and functions

我对 spring 云流中的新路由功能有一些疑问

我尝试实现一个简单的场景,我想发送一条带有 header spring.cloud.function.definition = consume1 或 consume2

的消息

我希望根据 header 上发送的内容调用 consume1 或 consume2,但这些方法是随机调用的。

我使用兔子管理控制台将消息发送给交换消费者

我有以下日志:

2020-02-27 14:48:25.896  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume1 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=9a4dff25-88ef-4d76-93e2-c8719cda122d, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, sourceData=(Body:'[B@3a92faa7(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, consumerQueue=consumer.app]), timestamp=1582811303347}]]
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-02-27 14:48:25.991  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 7 ms
2020-02-27 14:48:26.037  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-1
2020-02-27 14:48:26.111  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-1' has 1 subscriber(s).
2020-02-27 14:48:26.116  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-02-27 14:48:26.123  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#32438e24:0/SimpleConnection@3e58666d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62514]
2020-02-27 14:48:26.139  INFO 22132 --- [-1.customer-1-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:48:26.140  INFO 22132 --- [-1.customer-1-1] com.example.demo.TestSink                : Data received customer-1...body
2020-02-27 14:49:14.185  INFO 22132 --- [ consumer.app-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.194  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume2 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=33581edb-2832-1c92-b765-a05794512b34, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, sourceData=(Body:'[B@8159793(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, consumerQueue=consumer.app]), timestamp=1582811354186}]]
2020-02-27 14:49:14.203  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-2
2020-02-27 14:49:14.213  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-2' has 1 subscriber(s).
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] com.example.demo.TestSink                : Data received customer-2...body

application.yml

spring:
  main:
    allow-bean-definition-overriding: true
spring.cloud.stream:
  function.definition: supplier;receive1;receive2;consume1;consume2
  function.routing:
    enabled: true

  bindings:
    consume1-in-0.destination: consumer
    consume1-in-0.group: app
    consume2-in-0.destination: consumer
    consume2-in-0.group: app
    receive1-in-0.destination: customer-1
    receive1-in-0.group: customer-1
    receive2-in-0.destination: customer-2
    receive2-in-0.group: customer-2

DemoApplication.java

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.HttpStatus
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod.GET
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate
import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import java.util.function.Consumer
import java.util.function.Supplier


@SpringBootApplication
class DemoApplication

fun main(args: Array<String>) {
    runApplication<DemoApplication>(*args)
}

@RestController
class DynamicDestinationController(private val jsonMapper: ObjectMapper) {

    private val processor: EmitterProcessor<Message<String>> = EmitterProcessor.create<Message<String>>()

    @RequestMapping(path = ["/api/dest/{destName}"], method = [GET], consumes = ["*/*"])
    @ResponseStatus(HttpStatus.ACCEPTED)
    fun handleRequest(@PathVariable destName:String) {
        val message: Message<String> = MessageBuilder.withPayload("body")
                .setHeader("spring.cloud.stream.sendto.destination", destName).build()
        processor.onNext(message)
    }

    @Bean
    fun supplier(): Supplier<Flux<Message<String>>> {
        return Supplier { processor }
    }
}

const val destResourceUrl = "http://localhost:8080/api/dest"
@Component
class TestConsumer() {

    private val restTemplate: RestTemplate = RestTemplate()
    private val logger: Log = LogFactory.getLog(javaClass)

    @Bean
    fun consume1(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume1 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-1", String::class.java)
    }

    @Bean
    fun consume2(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume2 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-2", String::class.java)
    }
}


@Component
class TestSink {
    private val logger: Log = LogFactory.getLog(javaClass)
    @Bean
    fun receive1(): Consumer<String> = Consumer {
        logger.info("Data received customer-1..." + it);
    }

    @Bean
    fun receive2(): Consumer<String> = Consumer {
        logger.info("Data received customer-2..." + it);
    }
}

知道如何修复到消费者的路由吗?

提前致谢。

demo-repo

其实我也有点懵,暂时一步一步来吧。这是功能正常的应用程序(以您的模型为蓝本),它使用 sendto 功能,允许您将消息发送到特定的(现有 and/or 动态解析的)目的地。

(在 java 中,但您可以将其改写为 Kotlin)

@Controller
public class WebSourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebSourceApplication.class,
                "--spring.cloud.function.definition=supplier;consA;consB",
                "--spring.cloud.stream.bindings.consA-in-0.destination=consumerA",
                "--spring.cloud.stream.bindings.consA-in-0.group=consumerA-grp",
                "--spring.cloud.stream.bindings.consB-in-0.destination=consumerB",
                "--spring.cloud.stream.bindings.consB-in-0.group=consumerB-grp"
                );
    }

    EmitterProcessor<Message<String>> processor = EmitterProcessor.create();

    @RequestMapping(path = "/api/dest/{destName}", consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void delegateToSupplier(@RequestBody String body, @PathVariable String destName) {
        Message<String>  message = MessageBuilder.withPayload(body)
            .setHeader("spring.cloud.stream.sendto.destination", destName)
            .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<Message<String>>> supplier() {
        return () -> processor;
    }

    @Bean
    public Consumer<String> consA() {
        return v -> {
            System.out.println("Consuming from consA:  " + v);
        };
    }

    @Bean
    public Consumer<String> consB() {
        return v -> {
            System.out.println("Consuming from consB:  " + v);
        };
    }
}

当我卷曲它时,我得到了相应消费者的一致调用:

curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerA
log: Consuming from consA:  Hello Spring Cloud Stream
. . .

curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerB
log: Consuming from consB:  Hello Spring Cloud Stream

注意:没有启用路由属性。该功能的主要目的是始终调用一个函数 functionRouter 并让它代表您调用其他函数。它是 spring-cloud-function 的一个特性,这意味着它在 spring-cloud-srteam 和 channels/destinations 等之外工作

这不是你想要完成的吗?根据 HTTP 请求中的某个誓言变量将消息发送到不同的目的地?

这是一个不同的微服务示例,它接收路由功能,然后路由到不同的功能

public class FunctionRoutingApplication {

    public static void main(String[] args) {
        SpringApplication.run(FunctionRoutingApplication.class,
                "--spring.cloud.stream.function.routing.enabled=true"
                );
    }

    @Bean
    public Consumer<String> consA() {
        return v -> {
            System.out.println("Consuming from consA:  " + v);
        };
    }

    @Bean
    public Consumer<String> consB() {
        return v -> {
            System.out.println("Consuming from consB:  " + v);
        };
    }
}

差不多就这些了。转到您的经纪人并在提供 spring.cloud.function.definition=consA/consB headers 的同时将数据发送到 functionRouter-in-0 交换,您将看到一致的调用。

我是不是还漏掉了什么?