Spring 云功能路由
Spring Cloud function Routing
我正在尝试通过实施 Spring 云功能来创建一个 kafka。我使用的是spring-引导版本2.4.0和spring云版本:2020.0.0
@Slf4j
@Component
public class ArticleEventPublisher implements Function<ArticleAggregatedDocument, Message<ArticleMessage>> {
private final ArticleMessageMapper articleMessageMapper;
public ArticleEventPublisher(
ArticleMessageMapper articleMessageMapper) {
this.articleMessageMapper = articleMessageMapper;
}
@Override
public Message<ArticleMessage> apply(ArticleAggregatedDocument articleAggregatedDocument) {
String destination ="articleAggregated";
log.info("CREATING MESSAGE IN FUNCTION BEAN");
return createMessage( articleAggregatedDocument, destination);
}
private Message<ArticleMessage> createMessage(ArticleAggregatedDocument articleAggregatedDocument, String destination) {
ArticleMessage articleMessage = articleMessageMapper.apply(articleAggregatedDocument);
return MessageBuilder
.withPayload(articleMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, articleMessage.getKey().getBytes())
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
}
}
我知道它不是动态绑定,目标是硬编码的,但是,我现在想测试这个特定的实现。
尽管应用程序中的控件确实转到 class 并且正在创建消息,但我在 'destination'.
中看不到它们
我在 application.properties 中添加了以下内容:
spring.cloud.function.routing.enabled= true
spring.cloud.function.scan.packages= **publisher package name**
spring.cloud.stream.bindings.articleAggregated.destination=article.aggregated
spring.cloud.stream.bindings.articleAggregated.contentType=application/json
spring.cloud.stream.bindings.articleAggregated.producer.autoStartup=false
spring.cloud.stream.bindings.articleAggregated.producer.headerMode=none
spring.cloud.stream.kafka.bindings.articleAggregated.producer.configuration.client.id=articleAggregated
spring.cloud.stream.kafka.bindings.articleAggregated.producer.sync=true
这里还要提一下,下面的属性配置在IDEA intelliJ中不再解析:
spring.cloud.function.routing.enabled
spring.cloud.function.scan.packages
它们是否已弃用?我在文档中找不到任何相关信息。
我还从文档中了解到,如果是多绑定器,则应为每个 producer/consumer 添加绑定器,但我们只为应用程序中的所有消费者和生产者使用一个绑定器,因此我没有添加它。但我确实在绝望的时刻尝试过这个:
spring.cloud.stream.bindings.articleAggregated.binder=kafka
我在这里错过了什么?我已经坚持了一段时间,同时切换到 Sink+Supplier 实现作为替代方案。但我想了解为什么这个实现不起作用。
谢谢拉什米。
让我想出最好的答案,但随时可以提出更多问题。所以。 . .
- 在您的示例中,我看到了一个函数
userInfo
。这将导致两个绑定 - userInfo-in-0
用于输入,userInfo-out-0
用于输出。
- 由于此函数生成带有
. . .sendto.destination
header 的消息,因此 userInfo-out-0
绑定将仅在 sendto.destination
header 不是的情况下用作输出目标设置。
- 在您的配置中,我看到
spring.cloud.function.routing.enabled
、spring.cloud.function.scan.packages
、spring.kafka.bootstrap-servers
和三个 spring.cloud.stream.kafka.*
属性,它们指向默认的本地主机 kafka 实例,我们已经知道默认,因此这些属性都不是必需的。
- 你也不需要
spring.cloud.stream.bindings.articleAggregated.binder=kafka
属性 除非你的类路径上有多个你不需要的活页夹。
现在我对spring.cloud.stream.bindings.userAggregated.destination
属性感到困惑。 userAggregated
是动态目的地。通过简单地发送给它 s-c-stream 如果它不存在,它将自动在 Kafka 中提供它,否则它将被使用,你可以在那里看到你的消息。
换句话说,您可以删除所有已配置的属性,将消息发送到 Kafka 中的 userInfo-out-0
目的地,您应该会在 userAggregated
目的地成功检索它。
如果我遗漏了什么,请告诉我。
我正在尝试通过实施 Spring 云功能来创建一个 kafka。我使用的是spring-引导版本2.4.0和spring云版本:2020.0.0
@Slf4j
@Component
public class ArticleEventPublisher implements Function<ArticleAggregatedDocument, Message<ArticleMessage>> {
private final ArticleMessageMapper articleMessageMapper;
public ArticleEventPublisher(
ArticleMessageMapper articleMessageMapper) {
this.articleMessageMapper = articleMessageMapper;
}
@Override
public Message<ArticleMessage> apply(ArticleAggregatedDocument articleAggregatedDocument) {
String destination ="articleAggregated";
log.info("CREATING MESSAGE IN FUNCTION BEAN");
return createMessage( articleAggregatedDocument, destination);
}
private Message<ArticleMessage> createMessage(ArticleAggregatedDocument articleAggregatedDocument, String destination) {
ArticleMessage articleMessage = articleMessageMapper.apply(articleAggregatedDocument);
return MessageBuilder
.withPayload(articleMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, articleMessage.getKey().getBytes())
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
}
}
我知道它不是动态绑定,目标是硬编码的,但是,我现在想测试这个特定的实现。 尽管应用程序中的控件确实转到 class 并且正在创建消息,但我在 'destination'.
中看不到它们我在 application.properties 中添加了以下内容:
spring.cloud.function.routing.enabled= true
spring.cloud.function.scan.packages= **publisher package name**
spring.cloud.stream.bindings.articleAggregated.destination=article.aggregated
spring.cloud.stream.bindings.articleAggregated.contentType=application/json
spring.cloud.stream.bindings.articleAggregated.producer.autoStartup=false
spring.cloud.stream.bindings.articleAggregated.producer.headerMode=none
spring.cloud.stream.kafka.bindings.articleAggregated.producer.configuration.client.id=articleAggregated
spring.cloud.stream.kafka.bindings.articleAggregated.producer.sync=true
这里还要提一下,下面的属性配置在IDEA intelliJ中不再解析:
spring.cloud.function.routing.enabled
spring.cloud.function.scan.packages
它们是否已弃用?我在文档中找不到任何相关信息。
我还从文档中了解到,如果是多绑定器,则应为每个 producer/consumer 添加绑定器,但我们只为应用程序中的所有消费者和生产者使用一个绑定器,因此我没有添加它。但我确实在绝望的时刻尝试过这个:
spring.cloud.stream.bindings.articleAggregated.binder=kafka
我在这里错过了什么?我已经坚持了一段时间,同时切换到 Sink+Supplier 实现作为替代方案。但我想了解为什么这个实现不起作用。
谢谢拉什米。 让我想出最好的答案,但随时可以提出更多问题。所以。 . .
- 在您的示例中,我看到了一个函数
userInfo
。这将导致两个绑定 -userInfo-in-0
用于输入,userInfo-out-0
用于输出。 - 由于此函数生成带有
. . .sendto.destination
header 的消息,因此userInfo-out-0
绑定将仅在sendto.destination
header 不是的情况下用作输出目标设置。 - 在您的配置中,我看到
spring.cloud.function.routing.enabled
、spring.cloud.function.scan.packages
、spring.kafka.bootstrap-servers
和三个spring.cloud.stream.kafka.*
属性,它们指向默认的本地主机 kafka 实例,我们已经知道默认,因此这些属性都不是必需的。 - 你也不需要
spring.cloud.stream.bindings.articleAggregated.binder=kafka
属性 除非你的类路径上有多个你不需要的活页夹。
现在我对spring.cloud.stream.bindings.userAggregated.destination
属性感到困惑。 userAggregated
是动态目的地。通过简单地发送给它 s-c-stream 如果它不存在,它将自动在 Kafka 中提供它,否则它将被使用,你可以在那里看到你的消息。
换句话说,您可以删除所有已配置的属性,将消息发送到 Kafka 中的 userInfo-out-0
目的地,您应该会在 userAggregated
目的地成功检索它。
如果我遗漏了什么,请告诉我。