Spring Cloud Stream Kafka Stream Binder,未创建 MessageChannel
Spring Cloud Stream Kafka Stream Binder, MessageChannel is not created
我的简单测试应用程序有问题。我想像 this.
这样的 kafka 流绑定器创建消费者
@SpringBootApplication
public class CloudStreamAggregatorApplication {
public static void main(String[] args) {
SpringApplication.run(CloudStreamAggregatorApplication.class, args);
}
@Bean
public Consumer<KStream<String,String>> consume() {
return input -> input.foreach((k,v) -> System.out.println("CONSUMER: "+v));
}
}
但是当我尝试测试它时
@SpringBootTest
@EmbeddedKafka
@Import(TestChannelBinderConfiguration.class)
@DirtiesContext
class CloudStreamConsumerApplicationTests {
@Autowired
private InputDestination input;
@Test
void test01_Consume() {
input.send(new GenericMessage<>("test"));
}
}
我收到异常
java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ~[na:na]
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) ~[na:na]
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) ~[na:na]
at java.base/java.util.Objects.checkIndex(Objects.java:359) ~[na:na]
at java.base/java.util.ArrayList.get(ArrayList.java:427) ~[na:na]
at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34) ~[spring-cloud-stream-3.1.0-test-binder.jar:3.1.0]
当我将消费方法更改为
@Bean
public Consumer<Flux<String>> consume() {
return f -> f.subscribe(p -> System.out.println("CONSUMER: "+p));
}
没关系。
我尝试打印创建的频道
@Autowired
private Map<String,MessageChannel> channels;
...
channels.forEach((k,v) -> System.out.println("CHANNEL: "+k));
我收到第一种情况
CHANNEL: nullChannel
CHANNEL: errorChannel
第二种情况
CHANNEL: nullChannel
CHANNEL: errorChannel
CHANNEL: consume-in-0
CHANNEL: test.anonymous.errors
我不明白为什么会这样。谁能帮助我!
Kafka Streams 绑定器不是 MessageChannelBinder
,因此它不在内部使用消息通道。
您无法使用测试消息通道绑定程序进行测试。
我的简单测试应用程序有问题。我想像 this.
这样的 kafka 流绑定器创建消费者@SpringBootApplication
public class CloudStreamAggregatorApplication {
public static void main(String[] args) {
SpringApplication.run(CloudStreamAggregatorApplication.class, args);
}
@Bean
public Consumer<KStream<String,String>> consume() {
return input -> input.foreach((k,v) -> System.out.println("CONSUMER: "+v));
}
}
但是当我尝试测试它时
@SpringBootTest
@EmbeddedKafka
@Import(TestChannelBinderConfiguration.class)
@DirtiesContext
class CloudStreamConsumerApplicationTests {
@Autowired
private InputDestination input;
@Test
void test01_Consume() {
input.send(new GenericMessage<>("test"));
}
}
我收到异常
java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ~[na:na]
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) ~[na:na]
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) ~[na:na]
at java.base/java.util.Objects.checkIndex(Objects.java:359) ~[na:na]
at java.base/java.util.ArrayList.get(ArrayList.java:427) ~[na:na]
at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34) ~[spring-cloud-stream-3.1.0-test-binder.jar:3.1.0]
当我将消费方法更改为
@Bean
public Consumer<Flux<String>> consume() {
return f -> f.subscribe(p -> System.out.println("CONSUMER: "+p));
}
没关系。
我尝试打印创建的频道
@Autowired
private Map<String,MessageChannel> channels;
...
channels.forEach((k,v) -> System.out.println("CHANNEL: "+k));
我收到第一种情况
CHANNEL: nullChannel
CHANNEL: errorChannel
第二种情况
CHANNEL: nullChannel
CHANNEL: errorChannel
CHANNEL: consume-in-0
CHANNEL: test.anonymous.errors
我不明白为什么会这样。谁能帮助我!
Kafka Streams 绑定器不是 MessageChannelBinder
,因此它不在内部使用消息通道。
您无法使用测试消息通道绑定程序进行测试。