Spring 云功能路由

Spring Cloud function Routing

我正在尝试通过实施 Spring 云功能来创建一个 kafka。我使用的是spring-引导版本2.4.0和spring云版本:2020.0.0

@Slf4j
@Component
public class ArticleEventPublisher implements Function<ArticleAggregatedDocument, Message<ArticleMessage>> {

    private final ArticleMessageMapper articleMessageMapper;

    public ArticleEventPublisher(
        ArticleMessageMapper articleMessageMapper) {
        this.articleMessageMapper = articleMessageMapper;
    }

    @Override
    public Message<ArticleMessage> apply(ArticleAggregatedDocument articleAggregatedDocument) {
        String destination ="articleAggregated";
        log.info("CREATING MESSAGE IN FUNCTION BEAN");
        return createMessage( articleAggregatedDocument, destination);
    }

    private Message<ArticleMessage> createMessage(ArticleAggregatedDocument articleAggregatedDocument, String destination) {
        ArticleMessage articleMessage = articleMessageMapper.apply(articleAggregatedDocument);

        return MessageBuilder
            .withPayload(articleMessage)
            .setHeader(KafkaHeaders.MESSAGE_KEY, articleMessage.getKey().getBytes())
            .setHeader("spring.cloud.stream.sendto.destination", destination)
            .build();
    }
}

我知道它不是动态绑定,目标是硬编码的,但是,我现在想测试这个特定的实现。 尽管应用程序中的控件确实转到 class 并且正在创建消息,但我在 'destination'.

中看不到它们

我在 application.properties 中添加了以下内容:

spring.cloud.function.routing.enabled= true
spring.cloud.function.scan.packages= **publisher package name**
spring.cloud.stream.bindings.articleAggregated.destination=article.aggregated
spring.cloud.stream.bindings.articleAggregated.contentType=application/json
spring.cloud.stream.bindings.articleAggregated.producer.autoStartup=false
spring.cloud.stream.bindings.articleAggregated.producer.headerMode=none
spring.cloud.stream.kafka.bindings.articleAggregated.producer.configuration.client.id=articleAggregated
spring.cloud.stream.kafka.bindings.articleAggregated.producer.sync=true

这里还要提一下,下面的属性配置在IDEA intelliJ中不再解析:

spring.cloud.function.routing.enabled
spring.cloud.function.scan.packages

它们是否已弃用?我在文档中找不到任何相关信息。

我还从文档中了解到,如果是多绑定器,则应为每个 producer/consumer 添加绑定器,但我们只为应用程序中的所有消费者和生产者使用一个绑定器,因此我没有添加它。但我确实在绝望的时刻尝试过这个:

spring.cloud.stream.bindings.articleAggregated.binder=kafka

我在这里错过了什么?我已经坚持了一段时间,同时切换到 Sink+Supplier 实现作为替代方案。但我想了解为什么这个实现不起作用。

谢谢拉什米。 让我想出最好的答案,但随时可以提出更多问题。所以。 . .

  • 在您的示例中,我看到了一个函数 userInfo。这将导致两个绑定 - userInfo-in-0 用于输入,userInfo-out-0 用于输出。
  • 由于此函数生成带有 . . .sendto.destination header 的消息,因此 userInfo-out-0 绑定将仅在 sendto.destination header 不是的情况下用作输出目标设置。
  • 在您的配置中,我看到 spring.cloud.function.routing.enabledspring.cloud.function.scan.packagesspring.kafka.bootstrap-servers 和三个 spring.cloud.stream.kafka.* 属性,它们指向默认的本地主机 kafka 实例,我们已经知道默认,因此这些属性都不是必需的。
  • 你也不需要 spring.cloud.stream.bindings.articleAggregated.binder=kafka 属性 除非你的类路径上有多个你不需要的活页夹。

现在我对spring.cloud.stream.bindings.userAggregated.destination属性感到困惑。 userAggregated 是动态目的地。通过简单地发送给它 s-c-stream 如果它不存在,它将自动在 Kafka 中提供它,否则它将被使用,你可以在那里看到你的消息。

换句话说,您可以删除所有已配置的属性,将消息发送到 Kafka 中的 userInfo-out-0 目的地,您应该会在 userAggregated 目的地成功检索它。

如果我遗漏了什么,请告诉我。