拦截 Spring Cloud Stream SubscribableChannel 的传入消息
Intercept incoming message of Spring Cloud Stream SubscribableChannel
因为 org.springframework.messaging.support.ChannelInterceptor
的方法 postReceive
未在 org.springframework.messaging.SubscribableChannel
中调用。有什么方法可以拦截方法注释的所有传入消息@StreamListener(Sink.INPUT)
?
例如:
在进入 handle
方法之前拦截消息
@StreamListener(Sink.INPUT)
public void handle(Foo foo) {
// ...
}
以下是我对 Spring Cloud Stream
的设置
public interface EventSink {
String INPUT1 = "input1";
String INPUT2 = "input2";
@Input(INPUT1)
SubscribableChannel input1();
@Input(INPUT2)
SubscribableChannel input2();
}
public interface EventSource {
String OUTPUT1 = "output1";
String OUTPUT2 = "output2";
@Output(OUTPUT1)
MessageChannel output1();
@Output(OUTPUT2)
MessageChannel output2()';
}
spring:
cloud:
stream:
bindings:
input1:
destination: input1
input2:
destination: input2
output1:
destination: output1
output2:
destination: output2
public class EventHandler {
@StreamListener(EventSink.INPUT1)
public void handle(Foo1 foo) {
// ...
}
@StreamListener(EventSink.INPUT2)
public void handle(Foo2 foo) {
// ...
}
}
@Service
public class Bar1Service {
@Autowired
private EventSource source;
public void bar1() {
source.output1().send(MessageBuilder.withPayload("bar1").build());
}
}
@Service
public class Bar2Service {
@Autowired
private EventSource source;
public void bar2() {
source.output2().send(MessageBuilder.withPayload("bar2").build());
}
}
使用 DirectChannel
绑定器在同一线程上调用您的侦听器,因此 preSend
在这里是合适的。
但是,您不必乱用 ThreadLocal
,您可以使用方法签名访问 headers...
@StreamListener(Processor.INPUT)
public void handle(Foo foo, @Header("bar") String bar) {
...
}
编辑
@EnableBinding(Processor.class)
public class So41459187Application {
public static void main(String[] args) {
SpringApplication.run(So41459187Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(String in) {
return in.toUpperCase();
}
@Configuration
public static class Config {
@Bean
public BeanPostProcessor channelConfigurer() {
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if ("input".equals(beanName)) {
((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("PreSend on INPUT: " + message);
return message;
}
});
}
else if ("output".equals(beanName)) {
((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("PreSend on OUTPUT: " + message);
return message;
}
});
}
return bean;
}
};
}
}
}
因为 org.springframework.messaging.support.ChannelInterceptor
的方法 postReceive
未在 org.springframework.messaging.SubscribableChannel
中调用。有什么方法可以拦截方法注释的所有传入消息@StreamListener(Sink.INPUT)
?
例如:
在进入 handle
方法之前拦截消息
@StreamListener(Sink.INPUT)
public void handle(Foo foo) {
// ...
}
以下是我对 Spring Cloud Stream
的设置public interface EventSink {
String INPUT1 = "input1";
String INPUT2 = "input2";
@Input(INPUT1)
SubscribableChannel input1();
@Input(INPUT2)
SubscribableChannel input2();
}
public interface EventSource {
String OUTPUT1 = "output1";
String OUTPUT2 = "output2";
@Output(OUTPUT1)
MessageChannel output1();
@Output(OUTPUT2)
MessageChannel output2()';
}
spring:
cloud:
stream:
bindings:
input1:
destination: input1
input2:
destination: input2
output1:
destination: output1
output2:
destination: output2
public class EventHandler {
@StreamListener(EventSink.INPUT1)
public void handle(Foo1 foo) {
// ...
}
@StreamListener(EventSink.INPUT2)
public void handle(Foo2 foo) {
// ...
}
}
@Service
public class Bar1Service {
@Autowired
private EventSource source;
public void bar1() {
source.output1().send(MessageBuilder.withPayload("bar1").build());
}
}
@Service
public class Bar2Service {
@Autowired
private EventSource source;
public void bar2() {
source.output2().send(MessageBuilder.withPayload("bar2").build());
}
}
使用 DirectChannel
绑定器在同一线程上调用您的侦听器,因此 preSend
在这里是合适的。
但是,您不必乱用 ThreadLocal
,您可以使用方法签名访问 headers...
@StreamListener(Processor.INPUT)
public void handle(Foo foo, @Header("bar") String bar) {
...
}
编辑
@EnableBinding(Processor.class)
public class So41459187Application {
public static void main(String[] args) {
SpringApplication.run(So41459187Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(String in) {
return in.toUpperCase();
}
@Configuration
public static class Config {
@Bean
public BeanPostProcessor channelConfigurer() {
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if ("input".equals(beanName)) {
((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("PreSend on INPUT: " + message);
return message;
}
});
}
else if ("output".equals(beanName)) {
((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("PreSend on OUTPUT: " + message);
return message;
}
});
}
return bean;
}
};
}
}
}