Spring 集成队列通道容量错误
Spring Integration Queue-Channel Capacity wrong
我更多地玩 spring 集成,我非常感兴趣,但我认为有一个奇怪的行为,我找不到答案。
我有一个使用 Queue-Channel 的简单应用程序:
<int:channel id="ticketChannel" datatype="ch.elca.prototype.model.Ticket">
<int:queue capacity="1"/>
</int:channel>
我也试过 Rendezvous-Queue 效果一样:
<int:channel id="ticketChannel" datatype="ch.elca.prototype.model.Ticket">
<int:rendezvous-queue/>
</int:channel>
按我的理解,现在那个频道应该只能一次移动一条消息了。也许 2,如果你认为你有一个额外的容量。我不确定如何阅读它。但是我可以在不消耗的情况下向那个频道发送四次,这对我来说有点奇怪,我不明白容量。
见下文:
主要应用:
在这里,我流式传输 10 张票并为每张票调用 openTicket:
public static void main(final String[] args) throws InterruptedException {
try (ConfigurableApplicationContext context = SpringApplication.run(SassSimulatorApplication2.class, args)) {
final TicketGenerator generator = context.getBean(TicketGenerator.class);
final ProblemReporter reporter = context.getBean(ProblemReporter.class);
generator.createTickets().limit(10).forEach(reporter::openTicket);
context.close();
}
}
问题报告者:
public class ProblemReporter {
private volatile QueueChannel channel;
public synchronized void openTicket(final Ticket ticket){
final Message<Ticket> build = TicketMessageBuilder.buildMessage(ticket);
boolean send = channel.send(build);
System.out.println("send: " + send);
System.out.println("getQueueSize: " + channel.getQueueSize());
System.out.println("getSendCount: " + channel.getSendCount());
System.out.println("getReceiveCount: " + channel.getReceiveCount());
System.out.println("getSendErrorCount: " + channel.getSendErrorCount());
System.out.println("getRemainingCapacity: " + channel.getRemainingCapacity());
}
@Value("#{ticketChannel}")
public void setChannel(final QueueChannel channel) {
this.channel = channel;
}
}
启动应用程序时,我得到以下信息:
send: true
getQueueSize: 0
getSendCount: 0
getReceiveCount: 0
getSendErrorCount: 0
getRemainingCapacity: 1
send: true
getQueueSize: 0
getSendCount: 0
getReceiveCount: 0
getSendErrorCount: 0
getRemainingCapacity: 1
send: true
getQueueSize: 1
getSendCount: 0
getReceiveCount: 0
getSendErrorCount: 0
getRemainingCapacity: 0
send: true
getQueueSize: 1
getSendCount: 0
getReceiveCount: 0
getSendErrorCount: 0
getRemainingCapacity: 0
我正在使用 Spring-Boot 1.3.3,Sprint-Integration 4.2。5.RELEASE。我还尝试了 Spring-Boot 1.2.8 和 Spring-Integration 4.1.9.
这是预期的行为吗???
提前致谢。
看起来你的 channel.send(build, 30000);
是针对 local
变量完成的,而不是共享 bean
。
我的测试用例看起来像:
QueueChannel channel = new QueueChannel(3);
IntStream.range(0, 4)
.forEach(i -> {
boolean send = channel.send(new GenericMessage<>("test-" + i), 100);
System.out.println("send: " + send);
System.out.println("getQueueSize: " + channel.getQueueSize());
System.out.println("getRemainingCapacity: " + channel.getRemainingCapacity());
});
结果是:
send: true
getQueueSize: 1
getRemainingCapacity: 2
send: true
getQueueSize: 2
getRemainingCapacity: 1
send: true
getQueueSize: 3
getRemainingCapacity: 0
send: false
getQueueSize: 3
getRemainingCapacity: 0
注意:sendCount
(和类似)只能通过 @EnableIntegrationMBeanExport
或 @EnableIntegrationManagement
启用。
请参阅参考手册中的 Management。
你也可以在框架中找到一些关于这个问题的测试用例,例如QueueChannelTests
.
我更多地玩 spring 集成,我非常感兴趣,但我认为有一个奇怪的行为,我找不到答案。
我有一个使用 Queue-Channel 的简单应用程序:
<int:channel id="ticketChannel" datatype="ch.elca.prototype.model.Ticket">
<int:queue capacity="1"/>
</int:channel>
我也试过 Rendezvous-Queue 效果一样:
<int:channel id="ticketChannel" datatype="ch.elca.prototype.model.Ticket">
<int:rendezvous-queue/>
</int:channel>
按我的理解,现在那个频道应该只能一次移动一条消息了。也许 2,如果你认为你有一个额外的容量。我不确定如何阅读它。但是我可以在不消耗的情况下向那个频道发送四次,这对我来说有点奇怪,我不明白容量。
见下文:
主要应用: 在这里,我流式传输 10 张票并为每张票调用 openTicket:
public static void main(final String[] args) throws InterruptedException {
try (ConfigurableApplicationContext context = SpringApplication.run(SassSimulatorApplication2.class, args)) {
final TicketGenerator generator = context.getBean(TicketGenerator.class);
final ProblemReporter reporter = context.getBean(ProblemReporter.class);
generator.createTickets().limit(10).forEach(reporter::openTicket);
context.close();
}
}
问题报告者:
public class ProblemReporter {
private volatile QueueChannel channel;
public synchronized void openTicket(final Ticket ticket){
final Message<Ticket> build = TicketMessageBuilder.buildMessage(ticket);
boolean send = channel.send(build);
System.out.println("send: " + send);
System.out.println("getQueueSize: " + channel.getQueueSize());
System.out.println("getSendCount: " + channel.getSendCount());
System.out.println("getReceiveCount: " + channel.getReceiveCount());
System.out.println("getSendErrorCount: " + channel.getSendErrorCount());
System.out.println("getRemainingCapacity: " + channel.getRemainingCapacity());
}
@Value("#{ticketChannel}")
public void setChannel(final QueueChannel channel) {
this.channel = channel;
}
}
启动应用程序时,我得到以下信息:
send: true
getQueueSize: 0
getSendCount: 0
getReceiveCount: 0
getSendErrorCount: 0
getRemainingCapacity: 1
send: true
getQueueSize: 0
getSendCount: 0
getReceiveCount: 0
getSendErrorCount: 0
getRemainingCapacity: 1
send: true
getQueueSize: 1
getSendCount: 0
getReceiveCount: 0
getSendErrorCount: 0
getRemainingCapacity: 0
send: true
getQueueSize: 1
getSendCount: 0
getReceiveCount: 0
getSendErrorCount: 0
getRemainingCapacity: 0
我正在使用 Spring-Boot 1.3.3,Sprint-Integration 4.2。5.RELEASE。我还尝试了 Spring-Boot 1.2.8 和 Spring-Integration 4.1.9.
这是预期的行为吗???
提前致谢。
看起来你的 channel.send(build, 30000);
是针对 local
变量完成的,而不是共享 bean
。
我的测试用例看起来像:
QueueChannel channel = new QueueChannel(3);
IntStream.range(0, 4)
.forEach(i -> {
boolean send = channel.send(new GenericMessage<>("test-" + i), 100);
System.out.println("send: " + send);
System.out.println("getQueueSize: " + channel.getQueueSize());
System.out.println("getRemainingCapacity: " + channel.getRemainingCapacity());
});
结果是:
send: true
getQueueSize: 1
getRemainingCapacity: 2
send: true
getQueueSize: 2
getRemainingCapacity: 1
send: true
getQueueSize: 3
getRemainingCapacity: 0
send: false
getQueueSize: 3
getRemainingCapacity: 0
注意:sendCount
(和类似)只能通过 @EnableIntegrationMBeanExport
或 @EnableIntegrationManagement
启用。
请参阅参考手册中的 Management。
你也可以在框架中找到一些关于这个问题的测试用例,例如QueueChannelTests
.