无法在主题中创建持久订阅者
Cannot create durable subcriber in Topic
参考,我使用JMS+ActiveMQ+SpringBoot创建持久订阅者。我还使用 UUID.randomUUID().toString() 生成了唯一客户端 ID。但是,在启动订阅者(接收者)时,应用程序抛出以下警告消息
Cause: Durable consumer is in use for client: 8f1019fd-50d4-457b-b417-2058917ed7bb and subscriptionName: org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter
请帮我删除上面的警告信息。
在 ActiveMQ 服务器中创建客户端 ID 为“8f1019fd-50d4-457b-b417-2058917ed7bb”的持久主题
以下为源码
ReceiverTopicApplicaton.java
@SpringBootApplication
@EnableJms
public class ReceiverTopicApplicaton {
public static void main(String[] args) {
SpringApplication.run(ReceiverTopicApplicaton.class, args);
}
private ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
@Bean
public DefaultJmsListenerContainerFactory topicListenerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
//we need to set destinationResolver() to remove the warning message
factory.setDestinationResolver(destinationResolver());
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
factory.setConcurrency("3-10");
factory.setClientId(UUID.randomUUID().toString());
factory.setSubscriptionDurable(true);
return factory;
}
Receiver.java
@Component
public class Receiver {
@JmsListener(destination = "durable.topic", containerFactory = "topicListenerFactory")
public void receiveMessage(final Message message) throws JMSException {
if (message instanceof ObjectMessage) {
Object object = ((ObjectMessage) message).getObject();
StudentDto studentDto = (StudentDto) object;
System.out.println("Receiver :: Student Object Received..." + studentDto);
}
}
}
StudentDto.java
public class StudentDto implements Serializable {
private Long studentId;
private String studentName;
private String gender;
private Long age;
private String studentClass;
private LocalDate birthDate;
public StudentDto() {}
public StudentDto(Long studentId, String studentName,String gender,Long age,String studentClass, LocalDate birthDate) {
this.studentId = studentId;
this.studentName = studentName;
this.gender = gender;
this.age = age;
this.studentClass = studentClass;
this.birthDate = birthDate;
}
@Override
public String toString() {
return "Student [studentId=" + studentId + ", studentName=" + studentName + ", Gender=" + gender + ", Age="
+ age + ", studentClass=" + studentClass + ", birthDate=" + birthDate + "]";
}
}
SendMessageApplication.java
@SpringBootApplication
@EnableJms
public class SendMessageApplication {
public static void main(String[] args) {
SpringApplication.run(SendMessageApplication.class, args);
}
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer)
{
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
TopicSendMessage.java
@RestController
@RequestMapping(path = "/schoolDashboard/topic")
class TopicSendMessage {
@Autowired
private JmsTemplate jmsTemplate;
@GetMapping(path = "/publishMessage")
public void sendMessage() throws Exception{
String birthDate = "1978-10-05";
StudentDto studentDto = new StudentDto(new Long(1), "Ritesh", "Male", new Long(12), "Class-V", LocalDate.parse(birthDate));
System.out.println("TopicSendMessage.java :: Topic - Publishing Student Object....");
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend("student.topic", studentDto);
}
}
由于您使用的是 ActiveMQ 5.x,因此您使用的是 JMS 1.1,并且 JMS 1.1 规范规定只能有 一个 订阅者附加到持久订阅.由于您正在使用 setConcurrency("3-10")
Spring 试图在同一个持久订阅上创建多个订阅者,这会导致您看到的错误。您应该:
- 使用
setConcurrency("1")
。这可能会导致性能显着下降。
- 移动到支持 JMS 2.0 的代理,例如 ActiveMQ Artemis 并调用
setSubscriptionShared(true)
,因为 JMS 2.0 对于持久订阅的多个订阅者没有与 JMS 1.1 相同的限制。
参考
Cause: Durable consumer is in use for client: 8f1019fd-50d4-457b-b417-2058917ed7bb and subscriptionName: org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter
请帮我删除上面的警告信息。
在 ActiveMQ 服务器中创建客户端 ID 为“8f1019fd-50d4-457b-b417-2058917ed7bb”的持久主题
ReceiverTopicApplicaton.java
@SpringBootApplication
@EnableJms
public class ReceiverTopicApplicaton {
public static void main(String[] args) {
SpringApplication.run(ReceiverTopicApplicaton.class, args);
}
private ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
@Bean
public DefaultJmsListenerContainerFactory topicListenerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
//we need to set destinationResolver() to remove the warning message
factory.setDestinationResolver(destinationResolver());
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
factory.setConcurrency("3-10");
factory.setClientId(UUID.randomUUID().toString());
factory.setSubscriptionDurable(true);
return factory;
}
Receiver.java
@Component
public class Receiver {
@JmsListener(destination = "durable.topic", containerFactory = "topicListenerFactory")
public void receiveMessage(final Message message) throws JMSException {
if (message instanceof ObjectMessage) {
Object object = ((ObjectMessage) message).getObject();
StudentDto studentDto = (StudentDto) object;
System.out.println("Receiver :: Student Object Received..." + studentDto);
}
}
}
StudentDto.java
public class StudentDto implements Serializable {
private Long studentId;
private String studentName;
private String gender;
private Long age;
private String studentClass;
private LocalDate birthDate;
public StudentDto() {}
public StudentDto(Long studentId, String studentName,String gender,Long age,String studentClass, LocalDate birthDate) {
this.studentId = studentId;
this.studentName = studentName;
this.gender = gender;
this.age = age;
this.studentClass = studentClass;
this.birthDate = birthDate;
}
@Override
public String toString() {
return "Student [studentId=" + studentId + ", studentName=" + studentName + ", Gender=" + gender + ", Age="
+ age + ", studentClass=" + studentClass + ", birthDate=" + birthDate + "]";
}
}
SendMessageApplication.java
@SpringBootApplication
@EnableJms
public class SendMessageApplication {
public static void main(String[] args) {
SpringApplication.run(SendMessageApplication.class, args);
}
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer)
{
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
TopicSendMessage.java
@RestController
@RequestMapping(path = "/schoolDashboard/topic")
class TopicSendMessage {
@Autowired
private JmsTemplate jmsTemplate;
@GetMapping(path = "/publishMessage")
public void sendMessage() throws Exception{
String birthDate = "1978-10-05";
StudentDto studentDto = new StudentDto(new Long(1), "Ritesh", "Male", new Long(12), "Class-V", LocalDate.parse(birthDate));
System.out.println("TopicSendMessage.java :: Topic - Publishing Student Object....");
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend("student.topic", studentDto);
}
}
由于您使用的是 ActiveMQ 5.x,因此您使用的是 JMS 1.1,并且 JMS 1.1 规范规定只能有 一个 订阅者附加到持久订阅.由于您正在使用 setConcurrency("3-10")
Spring 试图在同一个持久订阅上创建多个订阅者,这会导致您看到的错误。您应该:
- 使用
setConcurrency("1")
。这可能会导致性能显着下降。 - 移动到支持 JMS 2.0 的代理,例如 ActiveMQ Artemis 并调用
setSubscriptionShared(true)
,因为 JMS 2.0 对于持久订阅的多个订阅者没有与 JMS 1.1 相同的限制。