WSO2 EI 6.6.0 与 RabbitMQ 3.8.5 - 无法在 rabbitmq 控制台中获得准确的发布有效负载

WSO2 EI 6.6.0 with RabbitMQ 3.8.5 - Unable to get exact posted payload in rabbitmq console

我只是想 post 通过使用 WSO2 EI 6.6.0 中的 messageStore 将传入的有效负载发送到 rabbitmq。

API-代码:

<api context="/testmqload" name="Test" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <log level="custom">
                <property expression="json-eval($)" name="Test"/>
            </log>
           
            <property name="OUT_ONLY" value="true"/>
            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
            <property name="messageType" value="application/json" scope="axis2"/>
            <store messageStore="SafaricomRequestStore"/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

消息存储区:

<?xml version="1.0" encoding="UTF-8"?>
<messageStore class="org.apache.synapse.message.store.impl.rabbitmq.RabbitMQStore" name="SafaricomRequestStore" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="store.rabbitmq.host.name">puffin.rmq2.cloudamqp.com</parameter>
    <parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
    <parameter name="store.rabbitmq.host.port">5672</parameter>
    <parameter name="store.rabbitmq.route.key">SafaricomRequestQueue</parameter>
    <parameter name="store.rabbitmq.username">username</parameter>
    <parameter name="store.rabbitmq.virtual.host">host</parameter>
    <parameter name="rabbitmq.connection.ssl.enabled">false</parameter>
    <parameter name="store.rabbitmq.exchange.name">amq.direct</parameter>
    <parameter name="store.rabbitmq.queue.name">SafaricomRequestQueueTest4</parameter>
    <parameter name="store.rabbitmq.password">password</parameter>
</messageStore> 

传入请求:

{
    "result": {
        "message": {
            "M-PESA_Cash_Out": "10.00",
            "M-PESA_Fee": "15.27",
            "Transaction_Fee": "0.44",
            "acct_no": "4000000009",
            "date": "04-JAN-2022",
            "power_id": "cab48a3b620e4b3f8d7c2d13a9efced7",
            "receiving_mobile": "799999999",
            "time": "17:47:29",
            "total_amount": "25.71",
            "transaction_id": "1002201040000053",
            "wallet_bank": "M-PESA"
        },
        "new_txn_id": 1002201040000053,
        "status": "S"
    }
}

消息在 rabbitMQ 中 posted 如下所示

当我使用 base64 decoder online 解码上面的屏幕截图值时,得到下面的值。

如果您注意到上面的屏幕截图,除了上面标记为黄色的 posted 有效载荷外,我还得到了一些其他值。

通过使用入站端点,我尝试从队列中获取消息,这将按顺序打印如下消息结构。

{
"text":"base64 encoded value...."
}

我可以知道为什么除了 posted 有效负载之外它还有其他额外的值吗?或者我该怎么做才能仅从队列中获取 posted 负载?

根据 tmoasz 在评论中的建议,修改了下面提到的代码。

修改后的API:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/rabbitmqtest" name="RabbitMQTestAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
         <log level="custom">
         <property name="RabbitMQTestAPI" value="is called***"/>
         <property name="IncomingRequest" expression="json-eval($)"/>
         </log>
            <property description="Initiate asynchronous mediation flow" name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <property description="Generate 202 response from mediation flow" name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
            <send description="Publish request payload to AMQP endpoint">
                <endpoint key="RabbitMQ-QueueEP"/>
            </send>
        
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

端点代码:

<?xml version="1.0" encoding="UTF-8"?>
<endpoint name="RabbitMQ-QueueEP" xmlns="http://ws.apache.org/ns/synapse">
    <address uri="rabbitmq:/RabbitMQTestQueue?rabbitmq.server.host.name=puffin.rmq2.cloudamqp.com&amp;rabbitmq.server.port=5672&amp;rabbitmq.server.user.name=username&amp;rabbitmq.server.password=password&amp;rabbitmq.queue.name=RabbitMQTestQueue&amp;rabbitmq.exchange.name=amq.direct">
        <suspendOnFailure>
            <initialDuration>-1</initialDuration>
            <progressionFactor>-1</progressionFactor>
        </suspendOnFailure>
        <markForSuspension>
            <retriesBeforeSuspension>0</retriesBeforeSuspension>
        </markForSuspension>
    </address>
</endpoint>

线路日志:

    [2022-01-06 19:01:11,282] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "POST /rabbitmqtest HTTP/1.1[\r][\n]"
[2022-01-06 19:01:11,283] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "Content-Type: application/json[\r][\n]"
[2022-01-06 19:01:11,285] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "User-Agent: PostmanRuntime/7.28.4[\r][\n]"
[2022-01-06 19:01:11,286] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "Accept: */*[\r][\n]"
[2022-01-06 19:01:11,289] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "Cache-Control: no-cache[\r][\n]"
[2022-01-06 19:01:11,290] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "Postman-Token: 75a4502d-7df0-444e-8ba8-39bcd6265204[\r][\n]"
[2022-01-06 19:01:11,290] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "Host: localhost:8280[\r][\n]"
[2022-01-06 19:01:11,291] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "Accept-Encoding: gzip, deflate, br[\r][\n]"
[2022-01-06 19:01:11,292] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "Connection: keep-alive[\r][\n]"
[2022-01-06 19:01:11,292] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "Content-Length: 25[\r][\n]"
[2022-01-06 19:01:11,293] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "[\r][\n]"
[2022-01-06 19:01:11,300] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "{[\r][\n]"
[2022-01-06 19:01:11,301] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "    "name":"justin"[\r][\n]"
[2022-01-06 19:01:11,302] DEBUG {org.apache.synapse.transport.http.wire} - HTTP-Listener I/O dispatcher-4 >> "}"
[2022-01-06 19:01:11,306]  INFO {org.apache.synapse.mediators.builtin.LogMediator} - RabbitMQTestAPI = is called***, IncomingRequest = {
    "name":"justin"
}
[2022-01-06 19:01:11,309]  INFO {org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactory} - Initializing channel pool of 20
[2022-01-06 19:01:11,879] ERROR {org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactory} - [puffin.rmq2.cloudamqp.com_5672_username_passwordI_null_null_null_null_null_null_null_null_null] Error creating connection to RabbitMQ Broker. Reattempting to connect. java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123)
        at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:382)
        at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:58)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:103)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:839)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:661)
        at org.apache.axis2.transport.rabbitmq.utils.RabbitMQUtils.createConnection(RabbitMQUtils.java:56)
        at org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactory.createConnection(RabbitMQConnectionFactory.java:159)
        at org.apache.axis2.transport.rabbitmq.RMQChannelPool.<init>(RMQChannelPool.java:20)
        at org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactory.initializeConnectionPool(RabbitMQConnectionFactory.java:392)
        at org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactoryManager.getConnectionFactory(RabbitMQConnectionFactoryManager.java:105)
        at org.apache.axis2.transport.rabbitmq.RabbitMQSender.getConnectionFactory(RabbitMQSender.java:135)
        at org.apache.axis2.transport.rabbitmq.RabbitMQSender.sendMessage(RabbitMQSender.java:81)
        at org.apache.axis2.transport.base.AbstractTransportSender.invoke(AbstractTransportSender.java:112)
        at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:442)
        at org.apache.axis2.description.OutOnlyAxisOperationClient.executeImpl(OutOnlyAxisOperation.java:297)
        at org.apache.axis2.client.OperationClient.execute(OperationClient.java:149)
        at org.apache.synapse.core.axis2.Axis2FlexibleMEPClient.send(Axis2FlexibleMEPClient.java:634)
        at org.apache.synapse.core.axis2.Axis2Sender.sendOn(Axis2Sender.java:85)
        at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.send(Axis2SynapseEnvironment.java:571)
        at org.apache.synapse.endpoints.AbstractEndpoint.send(AbstractEndpoint.java:408)
        at org.apache.synapse.endpoints.AddressEndpoint.send(AddressEndpoint.java:74)
        at org.apache.synapse.endpoints.IndirectEndpoint.send(IndirectEndpoint.java:56)
        at org.apache.synapse.mediators.builtin.SendMediator.mediate(SendMediator.java:123)
        at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
        at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
        at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:158)
        at org.apache.synapse.rest.Resource.process(Resource.java:331)
        at org.apache.synapse.rest.API.process(API.java:441)
        at org.apache.synapse.rest.RESTRequestHandler.apiProcess(RESTRequestHandler.java:135)
        at org.apache.synapse.rest.RESTRequestHandler.dispatchToAPI(RESTRequestHandler.java:113)
        at org.apache.synapse.rest.RESTRequestHandler.process(RESTRequestHandler.java:71)
        at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.injectMessage(Axis2SynapseEnvironment.java:327)
        at org.apache.synapse.core.axis2.SynapseMessageReceiver.receive(SynapseMessageReceiver.java:98)
        at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
        at org.apache.synapse.transport.passthru.ServerWorker.processNonEntityEnclosingRESTHandler(ServerWorker.java:368)
        at org.apache.synapse.transport.passthru.ServerWorker.processEntityEnclosingRequest(ServerWorker.java:427)
        at org.apache.synapse.transport.passthru.ServerWorker.run(ServerWorker.java:182)
        at org.apache.axis2.transport.base.threads.NativeWorkerPool.run(NativeWorkerPool.java:172)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost '/' refused for user 'username', class-id=10, method-id=40)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
        ... 42 more

如果要发送 JSON 消息作为负载到 RabbitMQ,不要使用消息存储。消息存储正在存储所有带有有效负载的突触消息上下文以供以后处理——这就是它们的目的。这就是为什么您在解码的 RabbitMQ 负载中看到的不仅是消息,还有其他属性。

要发送 JSON 到 RabbitMQ,您可以只使用 <send/> 到正确的 endpoint。 看成这个 documentation sample。 我也做了一些 sequance template 以便更容易地使用发送消息到 rabitmq,所以接下来你可能 look at this 对 post 有帮助,并适应你自己的需要。

我已经按照@tmoasz 的建议使用发送中介而不是消息存储进行端点调用,从而解决了上述问题。

API代码:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/rabbitmqtest" name="RabbitMQTestAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
         <log level="custom">
         <property name="RabbitMQTestAPI" value="is called***"/>
         <property name="IncomingRequest" expression="json-eval($)"/>
         </log>
            <property description="Initiate asynchronous mediation flow" name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <property description="Generate 202 response from mediation flow" name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
            <send description="Publish request payload to AMQP endpoint">
                <endpoint key="RabbitMQ-QueueEP"/>
            </send>
        
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

RabbitMQ-QueueEP :

<?xml version="1.0" encoding="UTF-8"?>
<endpoint name="RabbitMQ-QueueEP" xmlns="http://ws.apache.org/ns/synapse">
    <address uri="rabbitmq:/AMQPConnectionFactory?rabbitmq.server.host.name=puffin.rmq2.cloudamqp.com&amp;rabbitmq.server.port=5672&amp;rabbitmq.server.user.name=username&amp;rabbitmq.server.password=password&amp;rabbitmq.queue.name=RabbitMQTestQueue&amp;rabbitmq.queue.routing.key=RabbitMQTestQueue&amp;rabbitmq.server.virtual.host=hostname&amp;rabbitmq.exchange.name=amq.direct">
        <suspendOnFailure>
            <initialDuration>-1</initialDuration>
            <progressionFactor>-1</progressionFactor>
        </suspendOnFailure>
        <markForSuspension>
            <retriesBeforeSuspension>0</retriesBeforeSuspension>
        </markForSuspension>
    </address>
</endpoint>

API命中:

RabbitMQ 控制台: