ActiveMQ Artemis 为 Spring 引导客户端上定义的所有主题名称添加前缀 "jms.topic."
ActiveMQ Artemis prefixes "jms.topic." to all topic names defined on Spring Boot Client
我在 Spring 引导客户端上使用 spring-boot-starter-artemis
依赖项的 ActiveMQ Artemis 2.18.0 和版本 2.5.5。在我的用例中,客户需要通过主题相互交流。问题是字符串 jms.topic.
正在为客户端上定义的每个主题加上前缀。例如,主题 foo.sendInfo
变为 jms.topic.foo.sendInfo
.
broker.xml
文件如下图。 Spring 引导客户端使用的 acceptor
是端口 61617
上的 netty-ssl-acceptor
。
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
This value was determined through a calculation.
Your system could perform 1.18 writes per millisecond
on the current journal configuration.
That translates as a sync write every 844000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>844000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>844000</page-sync-timeout>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor>
<!-- SSL Acceptor -->
<acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>
<acceptor name ="mqtt+ssl">tcp://0.0.0.0:8883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="admins, users"/>
<permission type="deleteNonDurableQueue" roles="admins, users"/>
<permission type="createDurableQueue" roles="admins, users"/>
<permission type="deleteDurableQueue" roles="admins, users"/>
<permission type="createAddress" roles="admins, users"/>
<permission type="deleteAddress" roles="admins, users"/>
<permission type="consume" roles="admins, users"/>
<permission type="browse" roles="admins, users"/>
<permission type="send" roles="admins, users"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="admins"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>
Spring 引导客户端上的连接工厂配置如下所示。
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@Configuration
@EnableJms
public class MQTTConfig {
@Value("${JMS_BROKER_TRUSTSTORE}")
private String pathToTrustStore;
@Value("${JMS_BROKER_KEYSTORE}")
private String pathToKeystore;
@Value("${JMS_BROKER_TRUSTSTORE_PASSWORD}")
private String truststorePassword;
@Value("${JMS_BROKER_KEYSTORE_PASSWORD}")
private String keystorePassword;
@Bean
public ActiveMQConnectionFactory artemisSSLConnectionFactory() {
ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" +
"trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit");
artemisConnectionFactory.setUser("user");
artemisConnectionFactory.setPassword("password");
return artemisConnectionFactory;
}
/**
* Initialise {@link JmsTemplate} as required
*/
@Bean
public JmsTemplate jmsTemplate() throws JMSException {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(artemisSSLConnectionFactory());
jmsTemplate.setExplicitQosEnabled(true);
//setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
jmsTemplate.setPubSubDomain(true);
return jmsTemplate;
}
/**
* Initialise {@link DefaultJmsListenerContainerFactory} as required
*/
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(artemisSSLConnectionFactory());
//setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues
factory.setPubSubDomain(true);
return factory;
}
}
下面是POM文件,只有相关的依赖。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
<version>2.5.5</version>
</dependency>
下面的代码片段显示了发布到主题 server.weatherForecast
的生产者和订阅同一主题的消费者。消息在生产者和消费者之间毫无问题地交换,因为 jms.topic.
是在 Spring 引导客户端上定义的每个主题的前缀。但是,当我使用外部工具订阅 MQTT 消息时,在工具上定义的主题上收不到消息,除非订阅的主题从 server.weatherForecast
更改为 jms.topic.server.weatherForecast
.
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public void samplePC() {
@Autowired
private JMSTemplate jmsTemplate;
//producer that is called by a cron job
public void tester() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serialNumber", "105");
jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray());
}
//consumer (a message from the producer should be received here, but nothing arrives)
@JmsListener(destination = "server/forecast")
private void consumeWeatherForecastRequest(char[] incomingMessage) {
//some logic
jmsTemplate.convertAndSend("someTopic", "someMessage");
}
}
在为 RemotingConnectionImpl
启用 TRACE
日志记录后,我看到在 CreateSessionResponseMessage
中,serverVersion
属性的值为 131,而在 CreateSessionMessage
,version
属性的值为 127。
如何确保 jms.topic.
没有作为主题名称的前缀?
可以从 this GitHub 存储库下载一个最小的可重现示例。
我尝试在代码中记录前缀,但没有找到任何方法,所有日志都只显示没有前缀的主题名称。但是,从外部客户端订阅要发布到的主题应该指明前缀。在订阅 topicName
和 jms.topic.topicName
时,很明显消息将传递给后者。我注意到一些客户解析“。”作为“/”,因此在“。”的情况下可以尝试其他方法。不起作用。
您已定义“anycastPrefix=jms.queue;multicastPrefix=jms.topic”。在您的 SSL 接受器中。你应该删除它们。
另一个解决方案是在您的连接工厂上将 setEnableAmq1Prefix 设置为 false(但我认为这是默认值)
我在 broker.xml
中处理了您的 reproducer and I managed to re-create the problem you're seeing where the client is using jms.topic.test.topic
. However, once I added multicastPrefix=jms.topic.
to the "artemis" acceptor
问题就消失了。经纪人现在去掉客户的前缀并使用 test.topic
代替。
您确实在“netty-ssl-acceptor”acceptor
上设置了 multicastPrefix=jms.topic.
,但您的客户端实际上并未使用该接受器。
我还 运行 mvn dependency:tree
看看您的应用程序为什么使用 ActiveMQ Artemis 1.3.0 客户端。这是它的输出(部分):
[INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO] +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO] +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO] \- org.apache.activemq:artemis-jms-client:jar:1.3.0:compile
[INFO] +- org.apache.activemq:artemis-core-client:jar:1.3.0:compile
[INFO] | +- org.jgroups:jgroups:jar:3.6.9.Final:compile
[INFO] | +- org.apache.activemq:artemis-commons:jar:1.3.0:compile
[INFO] | | +- commons-beanutils:commons-beanutils:jar:1.9.2:compile
[INFO] | | | \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] | | \- com.google.guava:guava:jar:18.0:compile
[INFO] | \- io.netty:netty-all:jar:4.0.32.Final:compile
[INFO] +- org.apache.activemq:artemis-selector:jar:1.3.0:compile
[INFO] \- javax.inject:javax.inject:jar:1:compile
所以看起来对 org.apache.activemq:artemis-jms-client:jar:1.3.0
的依赖直接来自 org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5
这真的是 st运行ge 因为 it has clearly defined 对 org.apache.activemq:artemis-jms-client:jar:2.17.0
的依赖.但是,如果我将 <parent>
更改为使用 2.5.5
而不是 1.4.1.RELEASE
问题就会消失,例如:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
这是 mvn dependency:tree
现在输出的(部分):
[INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO] +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO] +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO] \- org.apache.activemq:artemis-jms-client:jar:2.17.0:compile
[INFO] +- org.apache.activemq:artemis-core-client:jar:2.17.0:compile
[INFO] | +- org.jgroups:jgroups:jar:3.6.13.Final:compile
[INFO] | +- org.apache.johnzon:johnzon-core:jar:1.2.14:compile
[INFO] | +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.68.Final:compile
[INFO] | | \- io.netty:netty-transport-native-unix-common:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-transport-native-kqueue:jar:osx-x86_64:4.1.68.Final:compile
[INFO] | +- io.netty:netty-codec-http:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-buffer:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-transport:jar:4.1.68.Final:compile
[INFO] | | \- io.netty:netty-resolver:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-handler:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-handler-proxy:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-codec:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-codec-socks:jar:4.1.68.Final:compile
[INFO] | \- io.netty:netty-common:jar:4.1.68.Final:compile
[INFO] +- org.apache.activemq:artemis-commons:jar:2.17.0:compile
[INFO] | +- org.jboss.logging:jboss-logging:jar:3.4.2.Final:compile
[INFO] | \- commons-beanutils:commons-beanutils:jar:1.9.4:compile
[INFO] | \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] \- org.apache.activemq:artemis-selector:jar:2.17.0:compile
我在 Spring 引导客户端上使用 spring-boot-starter-artemis
依赖项的 ActiveMQ Artemis 2.18.0 和版本 2.5.5。在我的用例中,客户需要通过主题相互交流。问题是字符串 jms.topic.
正在为客户端上定义的每个主题加上前缀。例如,主题 foo.sendInfo
变为 jms.topic.foo.sendInfo
.
broker.xml
文件如下图。 Spring 引导客户端使用的 acceptor
是端口 61617
上的 netty-ssl-acceptor
。
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
This value was determined through a calculation.
Your system could perform 1.18 writes per millisecond
on the current journal configuration.
That translates as a sync write every 844000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>844000</journal-buffer-timeout>
<journal-max-io>1</journal-max-io>
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>844000</page-sync-timeout>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=false</acceptor>
<!-- SSL Acceptor -->
<acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;anycastPrefix=jms.queue;multicastPrefix=jms.topic.;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>
<acceptor name ="mqtt+ssl">tcp://0.0.0.0:8883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;sslEnabled=true;keyStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprink.jks;keyStorePassword=changeit;trustStorePath=E:/apache-artemis-2.18.0/bin/localBroker/etc/sprinktrust.ts;trustStorePassword=changeit;needClientAuth=true;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="admins, users"/>
<permission type="deleteNonDurableQueue" roles="admins, users"/>
<permission type="createDurableQueue" roles="admins, users"/>
<permission type="deleteDurableQueue" roles="admins, users"/>
<permission type="createAddress" roles="admins, users"/>
<permission type="deleteAddress" roles="admins, users"/>
<permission type="consume" roles="admins, users"/>
<permission type="browse" roles="admins, users"/>
<permission type="send" roles="admins, users"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="admins"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>
Spring 引导客户端上的连接工厂配置如下所示。
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@Configuration
@EnableJms
public class MQTTConfig {
@Value("${JMS_BROKER_TRUSTSTORE}")
private String pathToTrustStore;
@Value("${JMS_BROKER_KEYSTORE}")
private String pathToKeystore;
@Value("${JMS_BROKER_TRUSTSTORE_PASSWORD}")
private String truststorePassword;
@Value("${JMS_BROKER_KEYSTORE_PASSWORD}")
private String keystorePassword;
@Bean
public ActiveMQConnectionFactory artemisSSLConnectionFactory() {
ActiveMQConnectionFactory artemisConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?&" + "sslEnabled=true&" +
"trustStorePath=" + pathToTrustStore + "&trustStorePassword=changeit");
artemisConnectionFactory.setUser("user");
artemisConnectionFactory.setPassword("password");
return artemisConnectionFactory;
}
/**
* Initialise {@link JmsTemplate} as required
*/
@Bean
public JmsTemplate jmsTemplate() throws JMSException {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(artemisSSLConnectionFactory());
jmsTemplate.setExplicitQosEnabled(true);
//setting PuSubDomain to true configures JmsTemplate to work with topics instead of queues
jmsTemplate.setPubSubDomain(true);
return jmsTemplate;
}
/**
* Initialise {@link DefaultJmsListenerContainerFactory} as required
*/
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws JMSException {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(artemisSSLConnectionFactory());
//setting PuSubDomain to true configures the DefaultJmsListenerContainerFactory to work with topics instead of queues
factory.setPubSubDomain(true);
return factory;
}
}
下面是POM文件,只有相关的依赖。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
<version>2.5.5</version>
</dependency>
下面的代码片段显示了发布到主题 server.weatherForecast
的生产者和订阅同一主题的消费者。消息在生产者和消费者之间毫无问题地交换,因为 jms.topic.
是在 Spring 引导客户端上定义的每个主题的前缀。但是,当我使用外部工具订阅 MQTT 消息时,在工具上定义的主题上收不到消息,除非订阅的主题从 server.weatherForecast
更改为 jms.topic.server.weatherForecast
.
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public void samplePC() {
@Autowired
private JMSTemplate jmsTemplate;
//producer that is called by a cron job
public void tester() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serialNumber", "105");
jmsTemplate.convertAndSend("server/forecast", jsonObject.toString().toCharArray());
}
//consumer (a message from the producer should be received here, but nothing arrives)
@JmsListener(destination = "server/forecast")
private void consumeWeatherForecastRequest(char[] incomingMessage) {
//some logic
jmsTemplate.convertAndSend("someTopic", "someMessage");
}
}
在为 RemotingConnectionImpl
启用 TRACE
日志记录后,我看到在 CreateSessionResponseMessage
中,serverVersion
属性的值为 131,而在 CreateSessionMessage
,version
属性的值为 127。
如何确保 jms.topic.
没有作为主题名称的前缀?
可以从 this GitHub 存储库下载一个最小的可重现示例。
我尝试在代码中记录前缀,但没有找到任何方法,所有日志都只显示没有前缀的主题名称。但是,从外部客户端订阅要发布到的主题应该指明前缀。在订阅 topicName
和 jms.topic.topicName
时,很明显消息将传递给后者。我注意到一些客户解析“。”作为“/”,因此在“。”的情况下可以尝试其他方法。不起作用。
您已定义“anycastPrefix=jms.queue;multicastPrefix=jms.topic”。在您的 SSL 接受器中。你应该删除它们。 另一个解决方案是在您的连接工厂上将 setEnableAmq1Prefix 设置为 false(但我认为这是默认值)
我在 broker.xml
中处理了您的 reproducer and I managed to re-create the problem you're seeing where the client is using jms.topic.test.topic
. However, once I added multicastPrefix=jms.topic.
to the "artemis" acceptor
问题就消失了。经纪人现在去掉客户的前缀并使用 test.topic
代替。
您确实在“netty-ssl-acceptor”acceptor
上设置了 multicastPrefix=jms.topic.
,但您的客户端实际上并未使用该接受器。
我还 运行 mvn dependency:tree
看看您的应用程序为什么使用 ActiveMQ Artemis 1.3.0 客户端。这是它的输出(部分):
[INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO] +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO] +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO] \- org.apache.activemq:artemis-jms-client:jar:1.3.0:compile
[INFO] +- org.apache.activemq:artemis-core-client:jar:1.3.0:compile
[INFO] | +- org.jgroups:jgroups:jar:3.6.9.Final:compile
[INFO] | +- org.apache.activemq:artemis-commons:jar:1.3.0:compile
[INFO] | | +- commons-beanutils:commons-beanutils:jar:1.9.2:compile
[INFO] | | | \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] | | \- com.google.guava:guava:jar:18.0:compile
[INFO] | \- io.netty:netty-all:jar:4.0.32.Final:compile
[INFO] +- org.apache.activemq:artemis-selector:jar:1.3.0:compile
[INFO] \- javax.inject:javax.inject:jar:1:compile
所以看起来对 org.apache.activemq:artemis-jms-client:jar:1.3.0
的依赖直接来自 org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5
这真的是 st运行ge 因为 it has clearly defined 对 org.apache.activemq:artemis-jms-client:jar:2.17.0
的依赖.但是,如果我将 <parent>
更改为使用 2.5.5
而不是 1.4.1.RELEASE
问题就会消失,例如:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
这是 mvn dependency:tree
现在输出的(部分):
[INFO] \- org.springframework.boot:spring-boot-starter-artemis:jar:2.5.5:compile
[INFO] +- jakarta.jms:jakarta.jms-api:jar:2.0.3:compile
[INFO] +- jakarta.json:jakarta.json-api:jar:1.1.6:compile
[INFO] \- org.apache.activemq:artemis-jms-client:jar:2.17.0:compile
[INFO] +- org.apache.activemq:artemis-core-client:jar:2.17.0:compile
[INFO] | +- org.jgroups:jgroups:jar:3.6.13.Final:compile
[INFO] | +- org.apache.johnzon:johnzon-core:jar:1.2.14:compile
[INFO] | +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.68.Final:compile
[INFO] | | \- io.netty:netty-transport-native-unix-common:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-transport-native-kqueue:jar:osx-x86_64:4.1.68.Final:compile
[INFO] | +- io.netty:netty-codec-http:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-buffer:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-transport:jar:4.1.68.Final:compile
[INFO] | | \- io.netty:netty-resolver:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-handler:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-handler-proxy:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-codec:jar:4.1.68.Final:compile
[INFO] | +- io.netty:netty-codec-socks:jar:4.1.68.Final:compile
[INFO] | \- io.netty:netty-common:jar:4.1.68.Final:compile
[INFO] +- org.apache.activemq:artemis-commons:jar:2.17.0:compile
[INFO] | +- org.jboss.logging:jboss-logging:jar:3.4.2.Final:compile
[INFO] | \- commons-beanutils:commons-beanutils:jar:1.9.4:compile
[INFO] | \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] \- org.apache.activemq:artemis-selector:jar:2.17.0:compile