spring-integration:将排队的消息分派给选择性消费者

spring-integration: dispatch queued messages to selective consumer

我有一个 spring 集成流程,它生成的消息应该保留在周围等待合适的消费者出现并使用它们。

@Bean
public IntegrationFlow messagesPerCustomerFlow() {
    return IntegrationFlows.
            from(WebFlux.inboundChannelAdapter("/messages/{customer}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.POST)
                    )
                    .requestPayloadType(JsonNode.class)
                    .headerExpression("customer", "#pathVariables.customer")
            )
            .channel(messagesPerCustomerQueue()) 
            .get();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(100);
}

@Bean
public QueueChannel messagesPerCustomerQueue() {
    return MessageChannels.queue()
            .get();
}

queue 中的消息应作为 server-sent 事件通过 http 传递,如下所示。

PublisherSubscription只是Publisher和IntegrationFlowRegistration的持有者,后者用于在不再需要时销毁动态创建的流(注意GET的传入消息没有内容,这不是通过 Webflux 集成正确处理 ATM,因此需要一个小的解决方法来访问推到 customer header 的路径变量:

@Bean
public IntegrationFlow eventMessagesPerCustomer() {
    return IntegrationFlows
       .from(WebFlux.inboundGateway("/events/{customer}")
            .requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
            .headerExpression("customer", "#pathVariables.customer")
            .payloadExpression("''") // neeeded to make handle((p,h) work
       )
       .log()
       .handle((p, h) -> {
           String customer = h.get("customer").toString();
           PublisherSubscription<JsonNode> publisherSubscription =
               subscribeToMessagesPerCustomer(customer);
           return Flux.from(publisherSubscription.getPublisher())
                   .map(Message::getPayload)
                   .doFinally(signalType ->
                      publisherSubscription.unsubscribe());
       })
       .get();
}

上面对 server-sent 事件的请求动态注册了一个流,该流按需使用 selective consumer realized by a filter with throwExceptionOnRejection(true). Following the spec for Message Handler chain 订阅 queue 通道,应确保向所有消费者提供消息直到一个接受它。

public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
    IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
            .filter("headers.customer=='" + customer + "'",
                    filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
    Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();

    IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
            .register();

    return new PublisherSubscription<>(messagePublisher, registration);
}

这个构造原则上可行,但存在以下问题:

我想要的是消息保留在 queue 中并重复提供给所有订阅者,直到它被消费或过期(适当的选择性消费者)。我该怎么做?

note that the incoming message for the GET has no content, which is not handled properly ATM by the Webflux integration

我不明白这种担忧。

WebFluxInboundEndpoint 使用此算法:

if (isReadable(request)) {
   ...
else {
    return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
}

其中 GET 方法真正转到 else 分支。而要发送的消息的 payloadMultiValueMap。此外,我们最近还与您一起解决了 POST 的问题,该版本已在版本 5.0.5 中发布:https://jira.spring.io/browse/INT-4462

Dispatcher has no subscribers

原则上不能在 QueueChannel 上发生。那里根本没有任何调度员。它只是队列和发件人 offers 要存储的消息。您缺少与我们分享的其他内容。但是让我们用自己的名字来称呼事物:messagesPerCustomerQueue 在您的应用程序中不是 QueueChannel

更新

关于:

What I want is that the message remains in the queue and is repeatedly offered to all subscribers until it is either consumed or expires (a proper selective consumer)

只有我们看到的是 PollableJmsChannel 基于嵌入式 ActiveMQ 的消息 TTL。作为这个队列的消费者,你应该有一个 PublishSubscribeChannelsetMinSubscribers(1) 来让 MessagingTemplate 在没有订阅者的时候抛出一个 MessageDeliveryException。这样 JMS 事务将被回滚并且消息将 return 到下一个轮询周期的队列。

内存中的问题 QueueChannel 没有事务性重新传递,一旦从该队列轮询的消息将丢失。

另一个类似于 JMS(事务性)的选项是 JdbcChannelMessageStore 用于 QueueChannel。虽然这样我们没有TTL功能...