在 RabbitMQ 中发布消息时发送 MessageProperties [priority=anyInteger]
Send MessageProperties [priority=anyInteger] while publishing message in RabbitMQ
我们在项目中使用了 rabbit MQ 和 Spring 集成。每个消息都有传递模式、header、属性和有效负载部分。
我们要添加属性,即)值为 2(任何整数)的优先级,"test message 3" 的有效负载,并将消息发布到名为 queue 的 OES。请查看屏幕截图。
如何在下面 outbound-channel-adapter(Spring 集成)中添加消息属性,即)优先级 =2(或任何值)。我知道我们可以通过添加到 "mapped-request-headers" 来添加 "headers",但我想添加属性。 "outbound-channel-adapter" 中没有为 MessageProperties 定义的属性。有没有办法克服这个问题。
我们对有效负载没有任何问题,它已经开始了。我们只想添加优先级为 2(任意值)的 MessageProperties。如何在 outbound-channel-adapter 中添加它(不需要硬编码,应该是通用的)?
<!-- the mapped-request-headers should be symmetric with
the list on the consumer side defined in consumerbeans.consumerHeaderMapper() -->
<int-amqp:outbound-channel-adapter id="publishingAmqpAdapter"
channel="producer-processed-event-channel"
amqp-template="amqpPublishingTemplate"
exchange-name="events_forwarding_exchange"
routing-key-expression="headers['routing-path']"
mapped-request-headers="X-CallerIdentity,routing-path,content-type,route_to*,event-type,compression-state,STANDARD_REQUEST_HEADERS"
/>
其他配置:
<!-- chain routes and transforms the ApplicationEvent into a json string -->
<int:chain id="routingAndTransforming"
input-channel="producer-inbound-event-channel"
output-channel="producer-routed-event-channel">
<int:transformer ref="outboundMessageTracker"/>
<int:transformer ref="messagePropertiesTransformer"/>
<int:transformer ref="eventRouter"/>
<int:transformer ref="eventToJsonTransformer"/>
</int:chain>
<int:transformer id="messagePayloadCompressor"
input-channel="compress-message-payload"
output-channel="producer-processed-event-channel"
ref="payloadCompressor"/>
@Configuration("amqpProducerBeans")
@ImportResource(value = "classpath:com/apple/store/platform/events/si/event-producer-flow.xml")
public class AmqpProducerBeans {
@Bean(name = { "amqpPublishingTemplate" })
public AmqpTemplate amqpTemplate() {
logger.debug("creating amqp publishing template");
RabbitTemplate rabbitTemplate = new RabbitTemplate(producerConnectionFactory());
SimpleMessageConverter converter = new SimpleMessageConverter();
// following needed for retry logic
converter.setCreateMessageIds(true);
rabbitTemplate.setMessageConverter(converter);
return rabbitTemplate;
}
/*Other code commented */
}
其他代码:
import org.springframework.integration.Message;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.message.GenericMessage;
public class PayloadCompressor {
@Transformer
public Message<byte[]> compress(Message<String> message){
/* some code commented */
Map<String, Object> headers = new HashMap<String, Object>();
headers.putAll(message.getHeaders());
headers.remove("compression-state");
headers.put("compression-state", CompressionState.COMPRESSED);
Message<byte[]> compressedMessage = new GenericMessage<byte[]>(compressedPayload, headers);
return compressedMessage;
}
如果我们不使用 spring 集成,那么我们可以使用 channel.basicPublish 下面的方式并发送 MessageProperties。
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("10.102.175.30");
factory.setUsername("rahul");
factory.setPassword("rahul");
factory.setPort(5672);
Connection connection = factory.newConnection();
System.out.println("got connection "+connection);
Channel channel = connection.createChannel();
MessageProperties msgproperties= new MessageProperties() ;
MessageProperties.BASIC.setPriority(3);
// set Messageproperties with priority
String exchangeName = "HeaderExchange";
String routingKey = "testkey";
//routingkey
byte[] messageBodyBytes = "Message having priority value 3".getBytes();
channel.basicPublish(exchangeName,
routingKey,
true,
msgproperties.BASIC,
messageBodyBytes);
如果您需要更多详细信息,请告诉我。
属性已经自动映射 - 请参阅 header mapper。
只需使用 <header-enricher/>
设置适当的 header,它就会映射到正确的 属性。在优先级的情况下,常量为here for the amqp-specific header constants, see here.
我们在项目中使用了 rabbit MQ 和 Spring 集成。每个消息都有传递模式、header、属性和有效负载部分。 我们要添加属性,即)值为 2(任何整数)的优先级,"test message 3" 的有效负载,并将消息发布到名为 queue 的 OES。请查看屏幕截图。
如何在下面 outbound-channel-adapter(Spring 集成)中添加消息属性,即)优先级 =2(或任何值)。我知道我们可以通过添加到 "mapped-request-headers" 来添加 "headers",但我想添加属性。 "outbound-channel-adapter" 中没有为 MessageProperties 定义的属性。有没有办法克服这个问题。
我们对有效负载没有任何问题,它已经开始了。我们只想添加优先级为 2(任意值)的 MessageProperties。如何在 outbound-channel-adapter 中添加它(不需要硬编码,应该是通用的)?
<!-- the mapped-request-headers should be symmetric with
the list on the consumer side defined in consumerbeans.consumerHeaderMapper() -->
<int-amqp:outbound-channel-adapter id="publishingAmqpAdapter"
channel="producer-processed-event-channel"
amqp-template="amqpPublishingTemplate"
exchange-name="events_forwarding_exchange"
routing-key-expression="headers['routing-path']"
mapped-request-headers="X-CallerIdentity,routing-path,content-type,route_to*,event-type,compression-state,STANDARD_REQUEST_HEADERS"
/>
其他配置:
<!-- chain routes and transforms the ApplicationEvent into a json string -->
<int:chain id="routingAndTransforming"
input-channel="producer-inbound-event-channel"
output-channel="producer-routed-event-channel">
<int:transformer ref="outboundMessageTracker"/>
<int:transformer ref="messagePropertiesTransformer"/>
<int:transformer ref="eventRouter"/>
<int:transformer ref="eventToJsonTransformer"/>
</int:chain>
<int:transformer id="messagePayloadCompressor"
input-channel="compress-message-payload"
output-channel="producer-processed-event-channel"
ref="payloadCompressor"/>
@Configuration("amqpProducerBeans")
@ImportResource(value = "classpath:com/apple/store/platform/events/si/event-producer-flow.xml")
public class AmqpProducerBeans {
@Bean(name = { "amqpPublishingTemplate" })
public AmqpTemplate amqpTemplate() {
logger.debug("creating amqp publishing template");
RabbitTemplate rabbitTemplate = new RabbitTemplate(producerConnectionFactory());
SimpleMessageConverter converter = new SimpleMessageConverter();
// following needed for retry logic
converter.setCreateMessageIds(true);
rabbitTemplate.setMessageConverter(converter);
return rabbitTemplate;
}
/*Other code commented */
}
其他代码:
import org.springframework.integration.Message;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.message.GenericMessage;
public class PayloadCompressor {
@Transformer
public Message<byte[]> compress(Message<String> message){
/* some code commented */
Map<String, Object> headers = new HashMap<String, Object>();
headers.putAll(message.getHeaders());
headers.remove("compression-state");
headers.put("compression-state", CompressionState.COMPRESSED);
Message<byte[]> compressedMessage = new GenericMessage<byte[]>(compressedPayload, headers);
return compressedMessage;
}
如果我们不使用 spring 集成,那么我们可以使用 channel.basicPublish 下面的方式并发送 MessageProperties。
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("10.102.175.30");
factory.setUsername("rahul");
factory.setPassword("rahul");
factory.setPort(5672);
Connection connection = factory.newConnection();
System.out.println("got connection "+connection);
Channel channel = connection.createChannel();
MessageProperties msgproperties= new MessageProperties() ;
MessageProperties.BASIC.setPriority(3);
// set Messageproperties with priority
String exchangeName = "HeaderExchange";
String routingKey = "testkey";
//routingkey
byte[] messageBodyBytes = "Message having priority value 3".getBytes();
channel.basicPublish(exchangeName,
routingKey,
true,
msgproperties.BASIC,
messageBodyBytes);
如果您需要更多详细信息,请告诉我。
属性已经自动映射 - 请参阅 header mapper。
只需使用 <header-enricher/>
设置适当的 header,它就会映射到正确的 属性。在优先级的情况下,常量为here for the amqp-specific header constants, see here.