Spring 集成 amqp 重复添加 headers
Spring integration amqp adding headers repetitively
我们长期以来一直在使用 spring 集成核心和 spring 集成 amqp 以及 rabbitm-mq。我们的服务还通过使用 x-death header 和 amaqp-expiration header 来使用死信机制,直到我们决定升级 spring-integration 的版本之前,它一直运行良好.
之前的版本:5.0.6.RELEASE
新版本:5.2.4.RELEASE
Rabbit mq headers 之前的版本
MessageProperties [headers={x-first-death-exchange=dms.arkona.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.arkona.exchange, time=Mon Feb 14 18:03:11 PST 2022, routing-keys=[push.customer.arkona.controller.update.wait.key], queue=push.customer.arkona.controller.update.wait}], x-first-death-reason=expired, x-first-death-queue=push.customer.arkona.controller.update.wait}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.arkona.exchange, receivedRoutingKey=push.customer.arkona.controller.update.key, deliveryTag=2407, consumerTag=amq.ctag-1_bYuEGts-Rfk6fqDyIqBw, consumerQueue=push.customer.arkona.controller.update]), amqp_expiration=10000, id=b77509b2-da63-1be0-ffd8-b96cbdc12c89, timestamp=1644890591793}
Rabbit mq headers 新版本
headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dms.transformer.exchange, amqp_deliveryTag=7875, amqp_consumerQueue=pull.parts.transformer.controller, amqp_redelivered=false, amqp_receivedRoutingKey=pull.parts.transformer.controller.key, x-first-death-exchange=dms.transformer.exchange, amqp_timestamp=Wed Feb 16 22:13:14 PST 2022, amqp_messageId=0df5a50e-9c15-ad1d-036d-a98993d2fa92, x-death=[{reason=expired, original-expiration=40000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, id=8c7b4d13-5a20-b430-05fc-492ed98971fc, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, </CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161696</TicketNumber><SaleType>R</SaleType><PriceLevel>1</PriceLevel> requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<CustPref>
<Pref value=""/>
<Pref value="text">Y</Pref>
<Pref value="email">Y</Pref>
<Pref value="call">Y</Pref>
<Pref value="phone"/>
<Pref value="postal">Y</Pref>
</CustPref>
<PreferredComm/>
</RepairOrderWrap>
, dms=ARK, original_expiration=10000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=20000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:14 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null<InvoiceNumber/><PurchaseOrderNumber/><Total>139.76</Total></CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161703</TicketNumber><CounterPersonID>054</CounterPersonID><CustomerKey>1090627</CustomerKey><CustName>EURO MOTORS</CustName><CustPhoneNo>7179200375</CustPhoneNo><SaleType>W</SaleType><PriceLevel>15</PriceLevel><OpenOrTranDate>20220117</OpenOrTranDate><InvoiceNumber/><PurchaseOrderNumber>T093945</PurchaseOrderNumber><Total>471.56</Total></CounterTicketSearchResponseSearchResult></Results></CounterTicketSearchResult></CounterTicketSearchResponse>, lastPullTime=2022-02-16T18:12:39Z, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<CustPref>
<Pref value=""/>
<Pref value="text">Y</Pref>
<Pref value="email">Y</Pref>
<Pref value="call">Y</Pref>
<Pref value="phone"/>
<Pref value="postal">Y</Pref>
</CustPref>
<PreferredComm/>
<Customer>
<ID/>
<Version/>
<CustomerKey>1115218</CustomerKey>
<TypeCode/>
<CustomerType/>
<DealerID/>
<FName>DORIS</FName>
<MName>J</MName>
<LName>KAUFFMAN</LName>
<ImageUrl/>
<Indexed/>
<Valid/>
<OptedOut/>
<UpdatedBy/>
<CreatedBy/>
<Communications>
<Communication>
<ID/>
<Version/>
<Type>P</Type>
<Value>7174681776</Value>
<CreatedBy/>
<Preferred/>
<Valid>1</Valid>
<Label>ct_phone</Label>
<Desc/>
<UpdatedBy/>
<CommPrefs/>
</Communication>
</Communications>
</Customer>
<LastModified/>
<StatusCode/>
<StatusMessage/>
</CustWrap>
</RepairOrderWrap>
, dms=ARK, original_expiration=5000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<Customer>
<ID/>
<Version/>
<CustomerKey>1115218</CustomerKey>
<TypeCode/>
<CustomerType/>
<DealerID/>
<FName>DORIS</FName>
<MName>J</MName>
<LName>KAUFFMAN</LName>
<ImageUrl/>
<Indexed/>
<Valid/>
<OptedOut/>
<UpdatedBy/>
<CreatedBy/>
<Communications>
<Communication>
<ID/>
<Version/>
<Type>P</Type>
<Value>7174681776</Value>
<CreatedBy/>
<LastModified/>
<StatusCode/>
<StatusMessage/>
</CustWrap>
</RepairOrderWrap>
, dms=ARK, original_expiration=null}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:44 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, previousResponseXml=<?contentType=application/x-java-serialized-object, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7869, consumerTag=amq.ctag-dZLHnCtvYWzpTvPM4ASzjA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:39 PST 2022, messageId=2da8985b-9c80-240b-fb29-7294035750f7, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-Fw8OPUahC_jNd--kZIdtSg, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:44 PST 2022, messageId=066c4289-5a25-8459-8571-8078421b68d5, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7872, consumerTag=amq.ctag-GYy8qHuHqhZ1s3PyFv5VJA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:54 PST 2022, messageId=5935a433-f7fa-a2c9-f782-d15b4235c91c, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-h6izVs7ziHOH93-M_CBhvw, consumerQueue=pull.parts.transformer.controller]), amqp_consumerTag=amq.ctag-WdkMr705y_K0iJUOnbGTug, contentType=application/x-java-serialized-object, timestamp=1645078434806}]
```
Notice that how spring-integration/spring-integration amqp is adding message body to header and other headers each time the message is passed to dead letter queue
Can someone pls guide us to any setting where in we can disable this repetitive addition of headers . As it is causing the following error in our rabbit mq
caused by: java.lang.IllegalArgumentException: Content headers exceeded max frame size: 132603 > 131072
基本上,消息 header 变得非常大,以至于超过了限制。它不会发生在 5.0.6 版本
更新:
我们放了一个像 original-expiration 这样的自定义键,它的值形式是 x-death header
public void processFailedMessage(MessageHeaders messageHeaders, HashMap<String,Object> message)
{
if (messageHeaders.containsKey("x-death")) {
List<HashMap<String, Object>> deathList = (List<HashMap<String, Object>>) messageHeaders
.get("x-death");
//logger.debug(message.get("messageId")+" "+deathList);
if (deathList.size() > 0) {
HashMap<String, Object> death = deathList.get(0);
if (death.containsKey("original-expiration")) {
message.put("original_expiration", (String) death.get("original-expiration"));
logger.info("original-expiration = "+death.get("original-expiration"));
}
}
} else {
message.put("original_expiration", null);
}
}
然后我们用header-enricher来丰富amqp-expirationheader
public String updateExpiration(HashMap<String, Object> message)
{
//logger.debug(message.get("messageId")+" original_expiration = "+(String) message.get("original_expiration")+" initialexpiration = "+this.initialexpiration+" multiplier = "+this.multiplier+" maximumretries = "+this.maximumretries);
Integer newExpiration = null;
if(message.get("original_expiration") == null )
{
newExpiration = this.initialexpiration;
}
else
{
double x = Math.log((double)(Integer.parseInt((String) message.get("original_expiration"))/this.initialexpiration));
double y = Math.log((double)this.multiplier);
long retryCount = Math.round(x/y);
logger.info(message.get("messageId")+" "+x+" "+y+" Retried "+(retryCount+1)+" of "+this.maximumretries);
if((retryCount + 1) >= this.maximumretries)
{
newExpiration = null;
}
else
{
newExpiration = Integer.parseInt((String) message.get("original_expiration"))*this.multiplier;
}
}
logger.info(" NewExpiration = "+(newExpiration!=null?String.valueOf(newExpiration):null));
return newExpiration!=null?String.valueOf(newExpiration):null;
}
UPDATE :
For dlq logic after updating the expiration header we send to the message to a router where it heck if the value of amqp expiration is not empty or empty. Based on that it sends the message either to a dead letter queue or a failed queuue
<rabbit:queue
name="pull.parts.transformer.controller.wait">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange"
value="dms.transformer.exchange" />
<entry key="x-dead-letter-routing-key"
value="pull.parts.transformer.controller.key" />
</rabbit:queue-arguments>
</rabbit:queue>
@Router(inputChannel="toPullPartsOrderErrorMessageRouter")
public String processPartsOrderFailedMessageExpiration(@Payload HashMap<String,Object> message, @Headers MessageHeaders messageHeaders)
{
logger.info("{} {}",message.get("messageId"),messageHeaders);
if(messageHeaders.containsKey("amqp_expiration"))
{
String expiration = (String) messageHeaders.get("amqp_expiration");
//logger.debug("{} {}",message.get("messageId"),expiration);
if(expiration==null || expiration.isEmpty() || expiration.equals("null"))
{
return FromPullPartsOrderErrorMessageRouterFailed;
}
return FromPullPartsOrderErrorMessageRouterWait;
}
else
{
return FromPullPartsOrderErrorMessageRouterFailed;
}
}
<int:channel id="PullPartsChannel"/>
<int:service-activator input-channel="PullPartsChannel"
ref="syncer" method="onPartsOrderRequestRecievedFromDMS"/>
<int-amqp:inbound-channel-adapter channel="PullPartsChannel"
error-channel = "errorChannelPullParts" queue-names="pull.parts.transformer.controller" concurrent-consumers="${partsorderpullthreads}"
connection-factory="rabbitConnectionFactory" header-mapper="syncerHeaderMapper" prefetch-count="${partsorderpullfetchcount}" />
<int:channel id="fromPullPartsErrorHandler"/>
<int:service-activator id="errorHandlerPullParts" input-channel="errorChannelPullParts" output-channel="fromPullPartsErrorHandler"
ref="errorhelper" method="onErrorInPullParts" />
<int:header-enricher input-channel="fromPullPartsErrorHandler" output-channel="toPullPartsOrderErrorMessageRouter">
<int:header name="amqp_expiration" method="updateExpiration" ref="errorhelper"/>
</int:header-enricher>
<!-- look at com.kaarma.syncer.utility.FailedMessageRouter -->
<int:channel id="fromPullPartsOrderErrorMessageRouterWait"/>
<int-amqp:outbound-channel-adapter
channel="fromPullPartsOrderErrorMessageRouterWait" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
routing-key="pull.parts.transformer.controller.wait.key" />
<int:channel id="fromPullPartsOrderErrorMessageRouterFailed"/>
<int-amqp:outbound-channel-adapter
channel="fromPullPartsOrderErrorMessageRouterFailed" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
routing-key="pull.parts.transformer.controller.failed.key" />
您可以配置 <int-amqp:outbound-channel-adapter>
以仅映射您感兴趣的那些 headers。默认情况下它映射所有:
private static String[] safeOutboundHeaders() {
return new String[] { "!x-*", "*" };
}
参见 mapped-request-headers
属性和文档:https://docs.spring.io/spring-integration/docs/current/reference/html/amqp.html#amqp-message-headers。
目前尚不清楚在您发送到此 AMQP 通道适配器的消息中没有显示自定义 headers 的是什么。只是因为“复制所有 headers”逻辑永远存在。
这对我有用
@Bean
public DefaultAmqpHeaderMapper outboundMapper() {
DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.outboundMapper();
amqpHeaderMapper.setRequestHeaderNames(DefaultAmqpHeaderMapper.STANDARD_REQUEST_HEADER_NAME_PATTERN);
amqpHeaderMapper.setReplyHeaderNames(DefaultAmqpHeaderMapper.STANDARD_REPLY_HEADER_NAME_PATTERN);
return amqpHeaderMapper;
}
我在 outbound-channel-adapter 中使用了这个 bean,同时将消息从 spring 集成通道发送到 rmq
我们长期以来一直在使用 spring 集成核心和 spring 集成 amqp 以及 rabbitm-mq。我们的服务还通过使用 x-death header 和 amaqp-expiration header 来使用死信机制,直到我们决定升级 spring-integration 的版本之前,它一直运行良好.
之前的版本:5.0.6.RELEASE
新版本:5.2.4.RELEASE
Rabbit mq headers 之前的版本
MessageProperties [headers={x-first-death-exchange=dms.arkona.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.arkona.exchange, time=Mon Feb 14 18:03:11 PST 2022, routing-keys=[push.customer.arkona.controller.update.wait.key], queue=push.customer.arkona.controller.update.wait}], x-first-death-reason=expired, x-first-death-queue=push.customer.arkona.controller.update.wait}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.arkona.exchange, receivedRoutingKey=push.customer.arkona.controller.update.key, deliveryTag=2407, consumerTag=amq.ctag-1_bYuEGts-Rfk6fqDyIqBw, consumerQueue=push.customer.arkona.controller.update]), amqp_expiration=10000, id=b77509b2-da63-1be0-ffd8-b96cbdc12c89, timestamp=1644890591793}
Rabbit mq headers 新版本
headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dms.transformer.exchange, amqp_deliveryTag=7875, amqp_consumerQueue=pull.parts.transformer.controller, amqp_redelivered=false, amqp_receivedRoutingKey=pull.parts.transformer.controller.key, x-first-death-exchange=dms.transformer.exchange, amqp_timestamp=Wed Feb 16 22:13:14 PST 2022, amqp_messageId=0df5a50e-9c15-ad1d-036d-a98993d2fa92, x-death=[{reason=expired, original-expiration=40000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, id=8c7b4d13-5a20-b430-05fc-492ed98971fc, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, </CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161696</TicketNumber><SaleType>R</SaleType><PriceLevel>1</PriceLevel> requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<CustPref>
<Pref value=""/>
<Pref value="text">Y</Pref>
<Pref value="email">Y</Pref>
<Pref value="call">Y</Pref>
<Pref value="phone"/>
<Pref value="postal">Y</Pref>
</CustPref>
<PreferredComm/>
</RepairOrderWrap>
, dms=ARK, original_expiration=10000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=20000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:14 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null<InvoiceNumber/><PurchaseOrderNumber/><Total>139.76</Total></CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161703</TicketNumber><CounterPersonID>054</CounterPersonID><CustomerKey>1090627</CustomerKey><CustName>EURO MOTORS</CustName><CustPhoneNo>7179200375</CustPhoneNo><SaleType>W</SaleType><PriceLevel>15</PriceLevel><OpenOrTranDate>20220117</OpenOrTranDate><InvoiceNumber/><PurchaseOrderNumber>T093945</PurchaseOrderNumber><Total>471.56</Total></CounterTicketSearchResponseSearchResult></Results></CounterTicketSearchResult></CounterTicketSearchResponse>, lastPullTime=2022-02-16T18:12:39Z, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<CustPref>
<Pref value=""/>
<Pref value="text">Y</Pref>
<Pref value="email">Y</Pref>
<Pref value="call">Y</Pref>
<Pref value="phone"/>
<Pref value="postal">Y</Pref>
</CustPref>
<PreferredComm/>
<Customer>
<ID/>
<Version/>
<CustomerKey>1115218</CustomerKey>
<TypeCode/>
<CustomerType/>
<DealerID/>
<FName>DORIS</FName>
<MName>J</MName>
<LName>KAUFFMAN</LName>
<ImageUrl/>
<Indexed/>
<Valid/>
<OptedOut/>
<UpdatedBy/>
<CreatedBy/>
<Communications>
<Communication>
<ID/>
<Version/>
<Type>P</Type>
<Value>7174681776</Value>
<CreatedBy/>
<Preferred/>
<Valid>1</Valid>
<Label>ct_phone</Label>
<Desc/>
<UpdatedBy/>
<CommPrefs/>
</Communication>
</Communications>
</Customer>
<LastModified/>
<StatusCode/>
<StatusMessage/>
</CustWrap>
</RepairOrderWrap>
, dms=ARK, original_expiration=5000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
<Dealer>
<EnterpriseCode>OTIM</EnterpriseCode>
<CompanyNumber>OT2</CompanyNumber>
<ServerName>itrack7.arkona.com</ServerName>
</Dealer>
<LookupParms>
<TicketNumber>161696</TicketNumber>
</LookupParms>
</CounterTicketLookup>
, messageXml=<RepairOrderWrap>
<Dealer>
<DealerID/>
<DealerName/>
<DealerDMS>
<DealerDMSID/>
<DMSName/>
<UserName/>
<Password/>
<HostUrl/>
<EnterpriseCode/>
<CompanyNumber/>
<ServerName/>
</DealerDMS>
</Dealer>
<RepairOrder>
<ID/>
<OrderNumber>161696</OrderNumber>
<OrderType>PO</OrderType>
<Amount>227.90</Amount>
<OrderDate>20220117 00:00:00</OrderDate>
<CloseDate/>
<PrintDate>20220117 00:00:00</PrintDate>
<DealerAssociateID/>
<AssociateDMSID>870</AssociateDMSID>
<Description/>
<OrderStatus>P</OrderStatus>
<NumberOfInvoices/>
<ReadyROData/>
<IsPaid/>
<PaidAmount/>
<IsPaidInKaarma/>
<IsPaymentRequestSent/>
<Tag/>
<MileageText/>
<InvoiceUrl/>
</RepairOrder>
<CustWrap>
<CAttrs>
<IsBusiness/>
<AssignedSA/>
<Comments/>
</CAttrs>
<IdentityAttrs>
<Attr value="SSN"/>
<Attr value="DriverLicense"/>
<Attr value="BirthDate"/>
<Attr value="Gender"/>
<Attr value="Language"/>
</IdentityAttrs>
<Customer>
<ID/>
<Version/>
<CustomerKey>1115218</CustomerKey>
<TypeCode/>
<CustomerType/>
<DealerID/>
<FName>DORIS</FName>
<MName>J</MName>
<LName>KAUFFMAN</LName>
<ImageUrl/>
<Indexed/>
<Valid/>
<OptedOut/>
<UpdatedBy/>
<CreatedBy/>
<Communications>
<Communication>
<ID/>
<Version/>
<Type>P</Type>
<Value>7174681776</Value>
<CreatedBy/>
<LastModified/>
<StatusCode/>
<StatusMessage/>
</CustWrap>
</RepairOrderWrap>
, dms=ARK, original_expiration=null}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:44 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, previousResponseXml=<?contentType=application/x-java-serialized-object, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7869, consumerTag=amq.ctag-dZLHnCtvYWzpTvPM4ASzjA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:39 PST 2022, messageId=2da8985b-9c80-240b-fb29-7294035750f7, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-Fw8OPUahC_jNd--kZIdtSg, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:44 PST 2022, messageId=066c4289-5a25-8459-8571-8078421b68d5, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7872, consumerTag=amq.ctag-GYy8qHuHqhZ1s3PyFv5VJA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:54 PST 2022, messageId=5935a433-f7fa-a2c9-f782-d15b4235c91c, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-h6izVs7ziHOH93-M_CBhvw, consumerQueue=pull.parts.transformer.controller]), amqp_consumerTag=amq.ctag-WdkMr705y_K0iJUOnbGTug, contentType=application/x-java-serialized-object, timestamp=1645078434806}]
```
Notice that how spring-integration/spring-integration amqp is adding message body to header and other headers each time the message is passed to dead letter queue
Can someone pls guide us to any setting where in we can disable this repetitive addition of headers . As it is causing the following error in our rabbit mq
caused by: java.lang.IllegalArgumentException: Content headers exceeded max frame size: 132603 > 131072
基本上,消息 header 变得非常大,以至于超过了限制。它不会发生在 5.0.6 版本
更新:
我们放了一个像 original-expiration 这样的自定义键,它的值形式是 x-death header
public void processFailedMessage(MessageHeaders messageHeaders, HashMap<String,Object> message)
{
if (messageHeaders.containsKey("x-death")) {
List<HashMap<String, Object>> deathList = (List<HashMap<String, Object>>) messageHeaders
.get("x-death");
//logger.debug(message.get("messageId")+" "+deathList);
if (deathList.size() > 0) {
HashMap<String, Object> death = deathList.get(0);
if (death.containsKey("original-expiration")) {
message.put("original_expiration", (String) death.get("original-expiration"));
logger.info("original-expiration = "+death.get("original-expiration"));
}
}
} else {
message.put("original_expiration", null);
}
}
然后我们用header-enricher来丰富amqp-expirationheader
public String updateExpiration(HashMap<String, Object> message)
{
//logger.debug(message.get("messageId")+" original_expiration = "+(String) message.get("original_expiration")+" initialexpiration = "+this.initialexpiration+" multiplier = "+this.multiplier+" maximumretries = "+this.maximumretries);
Integer newExpiration = null;
if(message.get("original_expiration") == null )
{
newExpiration = this.initialexpiration;
}
else
{
double x = Math.log((double)(Integer.parseInt((String) message.get("original_expiration"))/this.initialexpiration));
double y = Math.log((double)this.multiplier);
long retryCount = Math.round(x/y);
logger.info(message.get("messageId")+" "+x+" "+y+" Retried "+(retryCount+1)+" of "+this.maximumretries);
if((retryCount + 1) >= this.maximumretries)
{
newExpiration = null;
}
else
{
newExpiration = Integer.parseInt((String) message.get("original_expiration"))*this.multiplier;
}
}
logger.info(" NewExpiration = "+(newExpiration!=null?String.valueOf(newExpiration):null));
return newExpiration!=null?String.valueOf(newExpiration):null;
}
UPDATE :
For dlq logic after updating the expiration header we send to the message to a router where it heck if the value of amqp expiration is not empty or empty. Based on that it sends the message either to a dead letter queue or a failed queuue
<rabbit:queue
name="pull.parts.transformer.controller.wait">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange"
value="dms.transformer.exchange" />
<entry key="x-dead-letter-routing-key"
value="pull.parts.transformer.controller.key" />
</rabbit:queue-arguments>
</rabbit:queue>
@Router(inputChannel="toPullPartsOrderErrorMessageRouter")
public String processPartsOrderFailedMessageExpiration(@Payload HashMap<String,Object> message, @Headers MessageHeaders messageHeaders)
{
logger.info("{} {}",message.get("messageId"),messageHeaders);
if(messageHeaders.containsKey("amqp_expiration"))
{
String expiration = (String) messageHeaders.get("amqp_expiration");
//logger.debug("{} {}",message.get("messageId"),expiration);
if(expiration==null || expiration.isEmpty() || expiration.equals("null"))
{
return FromPullPartsOrderErrorMessageRouterFailed;
}
return FromPullPartsOrderErrorMessageRouterWait;
}
else
{
return FromPullPartsOrderErrorMessageRouterFailed;
}
}
<int:channel id="PullPartsChannel"/>
<int:service-activator input-channel="PullPartsChannel"
ref="syncer" method="onPartsOrderRequestRecievedFromDMS"/>
<int-amqp:inbound-channel-adapter channel="PullPartsChannel"
error-channel = "errorChannelPullParts" queue-names="pull.parts.transformer.controller" concurrent-consumers="${partsorderpullthreads}"
connection-factory="rabbitConnectionFactory" header-mapper="syncerHeaderMapper" prefetch-count="${partsorderpullfetchcount}" />
<int:channel id="fromPullPartsErrorHandler"/>
<int:service-activator id="errorHandlerPullParts" input-channel="errorChannelPullParts" output-channel="fromPullPartsErrorHandler"
ref="errorhelper" method="onErrorInPullParts" />
<int:header-enricher input-channel="fromPullPartsErrorHandler" output-channel="toPullPartsOrderErrorMessageRouter">
<int:header name="amqp_expiration" method="updateExpiration" ref="errorhelper"/>
</int:header-enricher>
<!-- look at com.kaarma.syncer.utility.FailedMessageRouter -->
<int:channel id="fromPullPartsOrderErrorMessageRouterWait"/>
<int-amqp:outbound-channel-adapter
channel="fromPullPartsOrderErrorMessageRouterWait" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
routing-key="pull.parts.transformer.controller.wait.key" />
<int:channel id="fromPullPartsOrderErrorMessageRouterFailed"/>
<int-amqp:outbound-channel-adapter
channel="fromPullPartsOrderErrorMessageRouterFailed" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
routing-key="pull.parts.transformer.controller.failed.key" />
您可以配置 <int-amqp:outbound-channel-adapter>
以仅映射您感兴趣的那些 headers。默认情况下它映射所有:
private static String[] safeOutboundHeaders() {
return new String[] { "!x-*", "*" };
}
参见 mapped-request-headers
属性和文档:https://docs.spring.io/spring-integration/docs/current/reference/html/amqp.html#amqp-message-headers。
目前尚不清楚在您发送到此 AMQP 通道适配器的消息中没有显示自定义 headers 的是什么。只是因为“复制所有 headers”逻辑永远存在。
这对我有用
@Bean
public DefaultAmqpHeaderMapper outboundMapper() {
DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.outboundMapper();
amqpHeaderMapper.setRequestHeaderNames(DefaultAmqpHeaderMapper.STANDARD_REQUEST_HEADER_NAME_PATTERN);
amqpHeaderMapper.setReplyHeaderNames(DefaultAmqpHeaderMapper.STANDARD_REPLY_HEADER_NAME_PATTERN);
return amqpHeaderMapper;
}
我在 outbound-channel-adapter 中使用了这个 bean,同时将消息从 spring 集成通道发送到 rmq