Spring 集成 amqp 重复添加 headers

Spring integration amqp adding headers repetitively

我们长期以来一直在使用 spring 集成核心和 spring 集成 amqp 以及 rabbitm-mq。我们的服务还通过使用 x-death header 和 amaqp-expiration header 来使用死信机制,直到我们决定升级 spring-integration 的版本之前,它一直运行良好.

之前的版本:5.0.6.RELEASE

新版本:5.2.4.RELEASE

Rabbit mq headers 之前的版本

    MessageProperties [headers={x-first-death-exchange=dms.arkona.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.arkona.exchange, time=Mon Feb 14 18:03:11 PST 2022, routing-keys=[push.customer.arkona.controller.update.wait.key], queue=push.customer.arkona.controller.update.wait}], x-first-death-reason=expired, x-first-death-queue=push.customer.arkona.controller.update.wait}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.arkona.exchange, receivedRoutingKey=push.customer.arkona.controller.update.key, deliveryTag=2407, consumerTag=amq.ctag-1_bYuEGts-Rfk6fqDyIqBw, consumerQueue=push.customer.arkona.controller.update]), amqp_expiration=10000, id=b77509b2-da63-1be0-ffd8-b96cbdc12c89, timestamp=1644890591793}

Rabbit mq headers 新版本

   headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=dms.transformer.exchange, amqp_deliveryTag=7875, amqp_consumerQueue=pull.parts.transformer.controller, amqp_redelivered=false, amqp_receivedRoutingKey=pull.parts.transformer.controller.key, x-first-death-exchange=dms.transformer.exchange, amqp_timestamp=Wed Feb 16 22:13:14 PST 2022, amqp_messageId=0df5a50e-9c15-ad1d-036d-a98993d2fa92, x-death=[{reason=expired, original-expiration=40000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, id=8c7b4d13-5a20-b430-05fc-492ed98971fc, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, </CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161696</TicketNumber><SaleType>R</SaleType><PriceLevel>1</PriceLevel> requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
           <Dealer>
              <EnterpriseCode>OTIM</EnterpriseCode>
              <CompanyNumber>OT2</CompanyNumber>
              <ServerName>itrack7.arkona.com</ServerName>
           </Dealer>
           <LookupParms>
               <TicketNumber>161696</TicketNumber>
           </LookupParms>
  </CounterTicketLookup>
  , messageXml=<RepairOrderWrap>
     <Dealer>
        <DealerID/>
        <DealerName/>
        <DealerDMS>
           <DealerDMSID/>
           <DMSName/>
           <UserName/>
           <Password/>
           <HostUrl/>
           <EnterpriseCode/>
           <CompanyNumber/>
           <ServerName/>
        </DealerDMS>
     </Dealer>
     <RepairOrder>
        <ID/>
        <OrderNumber>161696</OrderNumber>
        <OrderType>PO</OrderType>
        <Amount>227.90</Amount>
        <OrderDate>20220117 00:00:00</OrderDate>
        <CloseDate/>
        <PrintDate>20220117 00:00:00</PrintDate>
        <DealerAssociateID/>
        <AssociateDMSID>870</AssociateDMSID>
        <Description/>
        <OrderStatus>P</OrderStatus>
        <NumberOfInvoices/>
        <ReadyROData/>
        <IsPaid/>
        <PaidAmount/>
        <IsPaidInKaarma/>
        <IsPaymentRequestSent/>
        <Tag/>
        <MileageText/>
        <InvoiceUrl/>
     </RepairOrder>
     <CustWrap>
        <CAttrs>
           <IsBusiness/>
           <AssignedSA/>
           <Comments/>
        </CAttrs>
        <IdentityAttrs>
           <Attr value="SSN"/>
           <Attr value="DriverLicense"/>
           <Attr value="BirthDate"/>
           <Attr value="Gender"/>
           <Attr value="Language"/>
        </IdentityAttrs>
        <CustPref>
           <Pref value=""/>
           <Pref value="text">Y</Pref>
           <Pref value="email">Y</Pref>
           <Pref value="call">Y</Pref>
           <Pref value="phone"/>
           <Pref value="postal">Y</Pref>
        </CustPref>
        <PreferredComm/>
        
             
  </RepairOrderWrap>
  , dms=ARK, original_expiration=10000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=20000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:13:14 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null<InvoiceNumber/><PurchaseOrderNumber/><Total>139.76</Total></CounterTicketSearchResponseSearchResult><CounterTicketSearchResponseSearchResult><TicketNumber>161703</TicketNumber><CounterPersonID>054</CounterPersonID><CustomerKey>1090627</CustomerKey><CustName>EURO MOTORS</CustName><CustPhoneNo>7179200375</CustPhoneNo><SaleType>W</SaleType><PriceLevel>15</PriceLevel><OpenOrTranDate>20220117</OpenOrTranDate><InvoiceNumber/><PurchaseOrderNumber>T093945</PurchaseOrderNumber><Total>471.56</Total></CounterTicketSearchResponseSearchResult></Results></CounterTicketSearchResult></CounterTicketSearchResponse>, lastPullTime=2022-02-16T18:12:39Z, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
           <Dealer>
              <EnterpriseCode>OTIM</EnterpriseCode>
              <CompanyNumber>OT2</CompanyNumber>
              <ServerName>itrack7.arkona.com</ServerName>
           </Dealer>
           <LookupParms>
               <TicketNumber>161696</TicketNumber>
           </LookupParms>
  </CounterTicketLookup>
  , messageXml=<RepairOrderWrap>
     <Dealer>
        <DealerID/>
        <DealerName/>
        <DealerDMS>
           <DealerDMSID/>
           <DMSName/>
           <UserName/>
           <Password/>
           <HostUrl/>
           <EnterpriseCode/>
           <CompanyNumber/>
           <ServerName/>
        </DealerDMS>
     </Dealer>
     <RepairOrder>
        <ID/>
        <OrderNumber>161696</OrderNumber>
        <OrderType>PO</OrderType>
        <Amount>227.90</Amount>
        <OrderDate>20220117 00:00:00</OrderDate>
        <CloseDate/>
        <PrintDate>20220117 00:00:00</PrintDate>
        <DealerAssociateID/>
        <AssociateDMSID>870</AssociateDMSID>
        <Description/>
        <OrderStatus>P</OrderStatus>
        <NumberOfInvoices/>
        <ReadyROData/>
        <IsPaid/>
        <PaidAmount/>
        <IsPaidInKaarma/>
        <IsPaymentRequestSent/>
        <Tag/>
        <MileageText/>
        <InvoiceUrl/>
     </RepairOrder>
     <CustWrap>
        <CAttrs>
           <IsBusiness/>
           <AssignedSA/>
           <Comments/>
        </CAttrs>
        <IdentityAttrs>
           <Attr value="SSN"/>
           <Attr value="DriverLicense"/>
           <Attr value="BirthDate"/>
           <Attr value="Gender"/>
           <Attr value="Language"/>
        </IdentityAttrs>
        <CustPref>
           <Pref value=""/>
           <Pref value="text">Y</Pref>
           <Pref value="email">Y</Pref>
           <Pref value="call">Y</Pref>
           <Pref value="phone"/>
           <Pref value="postal">Y</Pref>
        </CustPref>
        <PreferredComm/>
        <Customer>
           <ID/>
           <Version/>
           <CustomerKey>1115218</CustomerKey>
           <TypeCode/>
           <CustomerType/>
           <DealerID/>
           <FName>DORIS</FName>
           <MName>J</MName>
           <LName>KAUFFMAN</LName>
           <ImageUrl/>
           <Indexed/>
           <Valid/>
           <OptedOut/>
           <UpdatedBy/>
           <CreatedBy/>
           <Communications>
              <Communication>
                 <ID/>
                 <Version/>
                 <Type>P</Type>
                 <Value>7174681776</Value>
                 <CreatedBy/>
                 <Preferred/>
                 <Valid>1</Valid>
                 <Label>ct_phone</Label>
                 <Desc/>
                 <UpdatedBy/>
                 <CommPrefs/>
              </Communication>
           </Communications>
        </Customer>
        <LastModified/>
        <StatusCode/>
        <StatusMessage/>
     </CustWrap>
  </RepairOrderWrap>
  , dms=ARK, original_expiration=5000}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:54 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, requestXml=<CounterTicketLookup xmlns="opentrack.dealertrack.com">
           <Dealer>
              <EnterpriseCode>OTIM</EnterpriseCode>
              <CompanyNumber>OT2</CompanyNumber>
              <ServerName>itrack7.arkona.com</ServerName>
           </Dealer>
           <LookupParms>
               <TicketNumber>161696</TicketNumber>
           </LookupParms>
  </CounterTicketLookup>
  , messageXml=<RepairOrderWrap>
     <Dealer>
        <DealerID/>
        <DealerName/>
        <DealerDMS>
           <DealerDMSID/>
           <DMSName/>
           <UserName/>
           <Password/>
           <HostUrl/>
           <EnterpriseCode/>
           <CompanyNumber/>
           <ServerName/>
        </DealerDMS>
     </Dealer>
     <RepairOrder>
        <ID/>
        <OrderNumber>161696</OrderNumber>
        <OrderType>PO</OrderType>
        <Amount>227.90</Amount>
        <OrderDate>20220117 00:00:00</OrderDate>
        <CloseDate/>
        <PrintDate>20220117 00:00:00</PrintDate>
        <DealerAssociateID/>
        <AssociateDMSID>870</AssociateDMSID>
        <Description/>
        <OrderStatus>P</OrderStatus>
        <NumberOfInvoices/>
        <ReadyROData/>
        <IsPaid/>
        <PaidAmount/>
        <IsPaidInKaarma/>
        <IsPaymentRequestSent/>
        <Tag/>
        <MileageText/>
        <InvoiceUrl/>
     </RepairOrder>
     <CustWrap>
        <CAttrs>
           <IsBusiness/>
           <AssignedSA/>
           <Comments/>
        </CAttrs>
        <IdentityAttrs>
           <Attr value="SSN"/>
           <Attr value="DriverLicense"/>
           <Attr value="BirthDate"/>
           <Attr value="Gender"/>
           <Attr value="Language"/>
        </IdentityAttrs>
      
        <Customer>
           <ID/>
           <Version/>
           <CustomerKey>1115218</CustomerKey>
           <TypeCode/>
           <CustomerType/>
           <DealerID/>
           <FName>DORIS</FName>
           <MName>J</MName>
           <LName>KAUFFMAN</LName>
           <ImageUrl/>
           <Indexed/>
           <Valid/>
           <OptedOut/>
           <UpdatedBy/>
           <CreatedBy/>
           <Communications>
              <Communication>
                 <ID/>
                 <Version/>
                 <Type>P</Type>
                 <Value>7174681776</Value>
                 <CreatedBy/>
                
        <LastModified/>
        <StatusCode/>
        <StatusMessage/>
     </CustWrap>
  </RepairOrderWrap>
  , dms=ARK, original_expiration=null}' MessageProperties [headers={x-first-death-exchange=dms.transformer.exchange, x-death=[{reason=expired, original-expiration=5000, count=1, exchange=dms.transformer.exchange, time=Wed Feb 16 22:12:44 PST 2022, routing-keys=[pull.parts.transformer.controller.wait.key], queue=pull.parts.transformer.controller.wait}], x-first-death-reason=expired, x-first-death-queue=pull.parts.transformer.controller.wait, sourceData=(Body:'{soapActionLookup=opentrack.dealertrack.com/CounterTicketLookup, ticketNumber=161696, truePullTime=2022-02-17T06:12:39Z, currentDate=2022-02-17, secondarySearchSoapAction=null, mode=PARTS_LOOKUP, CompanyNumber=OT2, dmsUsername=DTk@aryA, EnterpriseCode=OTIM, seachEndDate=null, dealerId=1374, messageId={"method":"PARTS_PULL","order_number":"161696","dms":"ARK","request_id":"8347fecf-b7db-40ad-aa0e-fa32615b8c5e","dealer_id":"1374","timestamp":1645078359343}, soapAction=opentrack.dealertrack.com/CounterTicketLookup, dmsHosturl=https://ot.dms.dealertrack.com/partsapi.asmx, closeDate=null, searchStartDate=null, soapActionSearch=opentrack.dealertrack.com/CounterTicketSearch, secondaryDMSURL=null, ServerName=itrack7.arkona.com, currentPullTime=2022-02-17T06:12:39Z, dmsPassword=w34$Rp9, secondaryLookupSoapAction=null, previousResponseXml=<?contentType=application/x-java-serialized-object, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7869, consumerTag=amq.ctag-dZLHnCtvYWzpTvPM4ASzjA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:39 PST 2022, messageId=2da8985b-9c80-240b-fb29-7294035750f7, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-Fw8OPUahC_jNd--kZIdtSg, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:44 PST 2022, messageId=066c4289-5a25-8459-8571-8078421b68d5, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7872, consumerTag=amq.ctag-GYy8qHuHqhZ1s3PyFv5VJA, consumerQueue=pull.parts.transformer.controller])}, timestamp=Wed Feb 16 22:12:54 PST 2022, messageId=5935a433-f7fa-a2c9-f782-d15b4235c91c, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dms.transformer.exchange, receivedRoutingKey=pull.parts.transformer.controller.key, deliveryTag=7873, consumerTag=amq.ctag-h6izVs7ziHOH93-M_CBhvw, consumerQueue=pull.parts.transformer.controller]), amqp_consumerTag=amq.ctag-WdkMr705y_K0iJUOnbGTug, contentType=application/x-java-serialized-object, timestamp=1645078434806}]
  ```
  
  Notice that how spring-integration/spring-integration amqp is adding message body to header and other headers each time the message is passed to dead letter queue
  
  Can someone pls guide us to any setting where in we can disable this repetitive addition of headers . As it is causing the following error in our rabbit mq

caused by: java.lang.IllegalArgumentException: Content headers exceeded max frame size: 132603 > 131072

基本上,消息 header 变得非常大,以至于超过了限制。它不会发生在 5.0.6 版本

更新:

我们放了一个像 original-expiration 这样的自定义键,它的值形式是 x-death header

public void processFailedMessage(MessageHeaders messageHeaders, HashMap<String,Object> message)
    {
        if (messageHeaders.containsKey("x-death")) {
            List<HashMap<String, Object>> deathList = (List<HashMap<String, Object>>) messageHeaders
                    .get("x-death");
            //logger.debug(message.get("messageId")+" "+deathList);
            if (deathList.size() > 0) {
                HashMap<String, Object> death = deathList.get(0);
                if (death.containsKey("original-expiration")) {
                    message.put("original_expiration", (String) death.get("original-expiration"));
                    logger.info("original-expiration = "+death.get("original-expiration"));
                } 
            } 
        } else {
            message.put("original_expiration", null);
        }
    }

然后我们用header-enricher来丰富amqp-expirationheader

public String updateExpiration(HashMap<String, Object> message)
    {
        //logger.debug(message.get("messageId")+" original_expiration = "+(String) message.get("original_expiration")+" initialexpiration = "+this.initialexpiration+" multiplier = "+this.multiplier+" maximumretries = "+this.maximumretries);
        Integer newExpiration = null;
        if(message.get("original_expiration") == null )
        {
            newExpiration = this.initialexpiration;
        
        }
        else
        {   
            double x = Math.log((double)(Integer.parseInt((String) message.get("original_expiration"))/this.initialexpiration));
            double y = Math.log((double)this.multiplier);
            long retryCount = Math.round(x/y);
            logger.info(message.get("messageId")+" "+x+" "+y+" Retried "+(retryCount+1)+" of "+this.maximumretries);
            if((retryCount + 1) >= this.maximumretries)
            {
                newExpiration = null;
            }
            else
            {
                newExpiration = Integer.parseInt((String) message.get("original_expiration"))*this.multiplier;
            }
        }
        logger.info(" NewExpiration = "+(newExpiration!=null?String.valueOf(newExpiration):null));
        return newExpiration!=null?String.valueOf(newExpiration):null;
    }




UPDATE : 

For dlq logic after updating the expiration header we send to the message to a router where it heck if the value of amqp expiration is not empty or empty. Based on that it sends the message either to a dead letter queue or a failed queuue

<rabbit:queue
        name="pull.parts.transformer.controller.wait">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange"
                value="dms.transformer.exchange" />
            <entry key="x-dead-letter-routing-key"
                value="pull.parts.transformer.controller.key" />
        </rabbit:queue-arguments>
    </rabbit:queue>


@Router(inputChannel="toPullPartsOrderErrorMessageRouter")
    public String processPartsOrderFailedMessageExpiration(@Payload HashMap<String,Object> message, @Headers MessageHeaders messageHeaders) 
    {
        logger.info("{} {}",message.get("messageId"),messageHeaders);
        if(messageHeaders.containsKey("amqp_expiration"))
        {
            String expiration = (String) messageHeaders.get("amqp_expiration");
            //logger.debug("{} {}",message.get("messageId"),expiration);
            if(expiration==null || expiration.isEmpty() || expiration.equals("null"))
            {
                return FromPullPartsOrderErrorMessageRouterFailed;
            }
            return FromPullPartsOrderErrorMessageRouterWait;
            
        }
        else
        {
            return FromPullPartsOrderErrorMessageRouterFailed;
        }
    }


<int:channel id="PullPartsChannel"/>

<int:service-activator input-channel="PullPartsChannel" 
        ref="syncer" method="onPartsOrderRequestRecievedFromDMS"/>
       

       

<int-amqp:inbound-channel-adapter channel="PullPartsChannel"
         error-channel = "errorChannelPullParts"  queue-names="pull.parts.transformer.controller" concurrent-consumers="${partsorderpullthreads}" 
         connection-factory="rabbitConnectionFactory" header-mapper="syncerHeaderMapper" prefetch-count="${partsorderpullfetchcount}" />
        
<int:channel id="fromPullPartsErrorHandler"/>
<int:service-activator id="errorHandlerPullParts" input-channel="errorChannelPullParts" output-channel="fromPullPartsErrorHandler" 
        ref="errorhelper" method="onErrorInPullParts" />
<int:header-enricher input-channel="fromPullPartsErrorHandler" output-channel="toPullPartsOrderErrorMessageRouter">
               <int:header name="amqp_expiration" method="updateExpiration" ref="errorhelper"/>
</int:header-enricher>      
<!-- look at  com.kaarma.syncer.utility.FailedMessageRouter -->
<int:channel id="fromPullPartsOrderErrorMessageRouterWait"/>        
<int-amqp:outbound-channel-adapter
        channel="fromPullPartsOrderErrorMessageRouterWait" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
        routing-key="pull.parts.transformer.controller.wait.key" /> 
<int:channel id="fromPullPartsOrderErrorMessageRouterFailed"/>      
<int-amqp:outbound-channel-adapter
        channel="fromPullPartsOrderErrorMessageRouterFailed" amqp-template="rabbitTemplate" exchange-name="dms.transformer.exchange"
        routing-key="pull.parts.transformer.controller.failed.key" />

        

您可以配置 <int-amqp:outbound-channel-adapter> 以仅映射您感兴趣的那些 headers。默认情况下它映射所有:

private static String[] safeOutboundHeaders() {
    return new String[] { "!x-*", "*" };
}

参见 mapped-request-headers 属性和文档:https://docs.spring.io/spring-integration/docs/current/reference/html/amqp.html#amqp-message-headers

目前尚不清楚在您发送到此 AMQP 通道适配器的消息中没有显示自定义 headers 的是什么。只是因为“复制所有 headers”逻辑永远存在。

这对我有用

@Bean
public DefaultAmqpHeaderMapper outboundMapper() {
    DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.outboundMapper();
    amqpHeaderMapper.setRequestHeaderNames(DefaultAmqpHeaderMapper.STANDARD_REQUEST_HEADER_NAME_PATTERN);
    amqpHeaderMapper.setReplyHeaderNames(DefaultAmqpHeaderMapper.STANDARD_REPLY_HEADER_NAME_PATTERN);
    return amqpHeaderMapper;
}

我在 outbound-channel-adapter 中使用了这个 bean,同时将消息从 spring 集成通道发送到 rmq