我们可以使用 spring 集成在 mosquitto 中批量加载 10 条消息吗
can we batch up groups of 10 message load in mosquitto using spring integration
这就是我使用 spring 定义我的 mqtt 连接的方式 integration.i 我不确定这是否可行 bt 我们能否设置一个 mqtt 订阅者在收到 10 条消息后工作。现在订阅者在发布消息后正常工作。
@Autowired
ConnectorConfig config;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(config.getUrl());
factory.setUserName(config.getUser());
factory.setPassword(config.getPass());
return factory;
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(config.getClientid(), mqttClientFactory(), "ALERT", "READING");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttRouterChannel());
return adapter;
}
/**this is router**/
@MessageEndpoint
public class MessageRouter {
private final Logger logger = LoggerFactory.getLogger(MessageRouter.class);
static final String ALERT = "ALERT";
static final String READING = "READING";
@Router(inputChannel = "mqttRouterChannel")
public String route(@Header("mqtt_topic") String topic){
String route = null;
switch (topic){
case ALERT:
logger.info("alert message received");
route = "alertTransformerChannel";
break;
case READING:
logger.info("reading message received");
route = "readingTransformerChannel";
break;
}
return route;
}
}
i need to batch up groups of 10 messages at a time
这不是 MqttPahoMessageDrivenChannelAdapter
的责任。
我们在那里使用 MqttCallback
语义:
* @param topic name of the topic on the message was published to
* @param message the actual message.
* @throws Exception if a terminal error has occurred, and the client should be
* shut down.
*/
public void messageArrived(String topic, MqttMessage message) throws Exception;
因此,根据 Paho 客户端的特性,我们无法在此通道适配器上对它们进行批处理。
我们可以从 Spring 集成角度向您建议的是 Aggregator EIP 实施。
在您的情况下,您应该在 mqttRouterChannel
之前为 AggregatorFactoryBean
添加 @ServiceActivator
@Bean
,然后再发送到路由器。
这可能很简单:
@Bean
@ServiceActivator(inputChannel = "mqttAggregatorChannel")
AggregatorFactoryBean mqttAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
aggregator.setCorrelationStrategy(m -> 1);
aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(10));
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setGroupTimeoutExpression(new ValueExpression<>(1000));
aggregator.setOutputChannelName("mqttRouterChannel");
return aggregator;
}
在 Reference Manual 中查看更多信息。
这就是我使用 spring 定义我的 mqtt 连接的方式 integration.i 我不确定这是否可行 bt 我们能否设置一个 mqtt 订阅者在收到 10 条消息后工作。现在订阅者在发布消息后正常工作。
@Autowired
ConnectorConfig config;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(config.getUrl());
factory.setUserName(config.getUser());
factory.setPassword(config.getPass());
return factory;
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(config.getClientid(), mqttClientFactory(), "ALERT", "READING");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttRouterChannel());
return adapter;
}
/**this is router**/
@MessageEndpoint
public class MessageRouter {
private final Logger logger = LoggerFactory.getLogger(MessageRouter.class);
static final String ALERT = "ALERT";
static final String READING = "READING";
@Router(inputChannel = "mqttRouterChannel")
public String route(@Header("mqtt_topic") String topic){
String route = null;
switch (topic){
case ALERT:
logger.info("alert message received");
route = "alertTransformerChannel";
break;
case READING:
logger.info("reading message received");
route = "readingTransformerChannel";
break;
}
return route;
}
}
i need to batch up groups of 10 messages at a time
这不是 MqttPahoMessageDrivenChannelAdapter
的责任。
我们在那里使用 MqttCallback
语义:
* @param topic name of the topic on the message was published to
* @param message the actual message.
* @throws Exception if a terminal error has occurred, and the client should be
* shut down.
*/
public void messageArrived(String topic, MqttMessage message) throws Exception;
因此,根据 Paho 客户端的特性,我们无法在此通道适配器上对它们进行批处理。
我们可以从 Spring 集成角度向您建议的是 Aggregator EIP 实施。
在您的情况下,您应该在 mqttRouterChannel
之前为 AggregatorFactoryBean
添加 @ServiceActivator
@Bean
,然后再发送到路由器。
这可能很简单:
@Bean
@ServiceActivator(inputChannel = "mqttAggregatorChannel")
AggregatorFactoryBean mqttAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
aggregator.setCorrelationStrategy(m -> 1);
aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(10));
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setGroupTimeoutExpression(new ValueExpression<>(1000));
aggregator.setOutputChannelName("mqttRouterChannel");
return aggregator;
}
在 Reference Manual 中查看更多信息。