Spring 订阅者的 pubsub 过滤消息
Spring pubsub Filter Messages for subscriber
我尝试使用 pubsub 实现请求异步响应模式。我通过使用 spring 集成来做到这一点。我在两端定义了一个主题和两个订阅(异步响应的事件发送者和请求的事件消费者)。但是如果消费者发送响应,它会发送给发送者和消费者。但是消费者事件是空的。到目前为止一切都很好。我的问题是如何在 springs pubsub 集成中为消息定义过滤器。这是 Google pubsub 中的一个功能。
我建议您使用 Spring 过滤器,让您可以采取行动,判断消息是应该丢弃还是转发到消息频道。你可以看到更多information.
你可以看到这些例子。
@Filter(inputChannel = "inputChannel", outputChannel = "secretChannel")
boolean filter(Message<?> message) {
String msg = message.getPayload().toString();
return msg.contains("secret");
}
您可以在 spring 中查看有关如何实现过滤的示例。你可以看到更多examples.
package com.zj.node.contentcenter.controller.content;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Producer
*
* @author 01
* @date 2019-08-03
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController {
private final Source source;
@GetMapping("/stream-send-msg")
public String streamSendMsg(String flagHeader) {
source.output().send(
MessageBuilder.withPayload("Message Body")
// Setting the header for filtering messages
.setHeader("flag-header", flagHeader)
.build()
);
return "send message success!";
}
}
我尝试使用 pubsub 实现请求异步响应模式。我通过使用 spring 集成来做到这一点。我在两端定义了一个主题和两个订阅(异步响应的事件发送者和请求的事件消费者)。但是如果消费者发送响应,它会发送给发送者和消费者。但是消费者事件是空的。到目前为止一切都很好。我的问题是如何在 springs pubsub 集成中为消息定义过滤器。这是 Google pubsub 中的一个功能。
我建议您使用 Spring 过滤器,让您可以采取行动,判断消息是应该丢弃还是转发到消息频道。你可以看到更多information.
你可以看到这些例子。
@Filter(inputChannel = "inputChannel", outputChannel = "secretChannel")
boolean filter(Message<?> message) {
String msg = message.getPayload().toString();
return msg.contains("secret");
}
您可以在 spring 中查看有关如何实现过滤的示例。你可以看到更多examples.
package com.zj.node.contentcenter.controller.content;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Producer
*
* @author 01
* @date 2019-08-03
**/
@RestController
@RequiredArgsConstructor
public class TestProducerController {
private final Source source;
@GetMapping("/stream-send-msg")
public String streamSendMsg(String flagHeader) {
source.output().send(
MessageBuilder.withPayload("Message Body")
// Setting the header for filtering messages
.setHeader("flag-header", flagHeader)
.build()
);
return "send message success!";
}
}