Spring 云数据流上的 PolledProcessor 问题
Issue with PolledProcessor on Spring cloud data flow
我正在使用 PolledProcessor 实现 spring 云数据流处理器。我按照此处的示例 https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers。下面是我的代码。我将带有源管道的流部署到此处理器(源 | 轮询处理器)到 scdf,并让源发布了一些消息。我确认处理器每秒从 scdf rabbitmq 轮询消息,但 result
始终是 false
。我去了 scdf rabbitmq 控制台,我看到那些消息都在队列中。因此,尽管处理器在代码中不断轮询,但它并未轮询消息。我还看到队列没有消费者。看起来 scdf 没有将此处理器绑定到队列。知道为什么吗?
public interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
@SpringBootApplication
@EnableBinding(PolledProcessor.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source, MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(dest::send);
Thread.sleep(1000);
}
};
}
}
这里是源和处理器之间队列的状态
我已经测试了 Spring Cloud Stream 应用程序,没有任何问题:
@SpringBootApplication
@EnableBinding(Polled.class)
public class So69383266Application {
public static void main(String[] args) {
SpringApplication.run(So69383266Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source) {
return args -> {
while (true) {
boolean result = source.poll(System.out::println);
System.out.println(result);
Thread.sleep(1000);
}
};
}
}
interface Polled {
@Input
PollableMessageSource source();
}
false
GenericMessage [payload=byte[6], headers={...
true
false
我建议你在AmqpMessageSource.doReceive()
中设置一个断点,看看发生了什么。
编辑
检查源是否正在使用正确队列的方法如下:
@Bean
public ApplicationRunner runner(PollableMessageSource source) {
return args -> {
while (true) {
DirectFieldAccessor dfa = new DirectFieldAccessor(source);
log.info(dfa.getPropertyValue("source.h.advised.targetSource.target.queue").toString());
boolean result = source.poll(System.out::println);
System.out.println(result);
Thread.sleep(1000);
}
};
}
我正在使用 PolledProcessor 实现 spring 云数据流处理器。我按照此处的示例 https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers。下面是我的代码。我将带有源管道的流部署到此处理器(源 | 轮询处理器)到 scdf,并让源发布了一些消息。我确认处理器每秒从 scdf rabbitmq 轮询消息,但 result
始终是 false
。我去了 scdf rabbitmq 控制台,我看到那些消息都在队列中。因此,尽管处理器在代码中不断轮询,但它并未轮询消息。我还看到队列没有消费者。看起来 scdf 没有将此处理器绑定到队列。知道为什么吗?
public interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
@SpringBootApplication
@EnableBinding(PolledProcessor.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source, MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(dest::send);
Thread.sleep(1000);
}
};
}
}
这里是源和处理器之间队列的状态
我已经测试了 Spring Cloud Stream 应用程序,没有任何问题:
@SpringBootApplication
@EnableBinding(Polled.class)
public class So69383266Application {
public static void main(String[] args) {
SpringApplication.run(So69383266Application.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source) {
return args -> {
while (true) {
boolean result = source.poll(System.out::println);
System.out.println(result);
Thread.sleep(1000);
}
};
}
}
interface Polled {
@Input
PollableMessageSource source();
}
false
GenericMessage [payload=byte[6], headers={...
true
false
我建议你在AmqpMessageSource.doReceive()
中设置一个断点,看看发生了什么。
编辑
检查源是否正在使用正确队列的方法如下:
@Bean
public ApplicationRunner runner(PollableMessageSource source) {
return args -> {
while (true) {
DirectFieldAccessor dfa = new DirectFieldAccessor(source);
log.info(dfa.getPropertyValue("source.h.advised.targetSource.target.queue").toString());
boolean result = source.poll(System.out::println);
System.out.println(result);
Thread.sleep(1000);
}
};
}