REST WSO2 卡夫卡

REST WSO2 Kafka

我已经在我的 Windows 工作站中安装了 WSO2 Integration Studio 版本 6.5.0,并使用 Kafka 消费者和生产者内置模板创建了一个项目。

api.xml:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
                <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
                <maxPoolSize>50</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>weatherdatatopic</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                    {"topic":"", "partition":"", "offset":""}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

weatherdatatransmitinboundEP.xml:

<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" name="WeatherDataTransmitInboundEP" onError="WeatherDataErrorSeq" sequence="WeatherDataProcessSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="sequential">true</parameter>
        <parameter name="interval">10</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="inbound.behavior">polling</parameter>
        <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
        <parameter name="topic.name">weatherdatatopic</parameter>
        <parameter name="poll.timeout">100</parameter>
        <parameter name="bootstrap.servers">localhost:9092</parameter>
        <parameter name="group.id">hello</parameter>
        <parameter name="contentType">application/json</parameter>
        <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
        <parameter name="class">org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
    </parameters>
    
</inboundEndpoint>

WeatherDataPublishService.xml:

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="WeatherDataPublishService" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
                <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
                <maxPoolSize>50</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>weatherdatatopic</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                    {"topic":"", "partition":"", "offset":""}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>

现在我可以向 http://localhost:8290/publishweatherdata 发送 post 请求并在 kafka 主题中获取它。我也可以在 wso2 中收到 kafka 生成的消息。如何从 wso2 向外部服务发送消息?我想我应该使用

<endpoint [name="string"] [key="string"]>
        address-endpoint | default-endpoint | wsdl-endpoint | load-balanced-endpoint | fail-over-endpoint
</endpoint>

但我不知道必须在哪里添加它以及如何配置

您可以使用调用调解器 [1] 或发送调解器 [2] 来实现您的用例。在调解器中,您可以定义要调用的所需端点。请参考以下示例配置。这里我们使用了调用中介来调用外部端点 http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd.

     <call>
        <endpoint>
           <address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
        </endpoint>
     </call>

在您的用例中,如果您在有效载荷工厂调解器之后完成了所需有效载荷的构建,则可以在有效载荷工厂调解器之后使用调用调解器来调用外部端点。此处由有效负载工厂中介构建的有效负载将用于调用外部端点。

    <payloadFactory media-type="json">
                <format>
                    {"topic":"", "partition":"", "offset":""}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            
             <call>
                <endpoint>
                   <address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
                </endpoint>
             </call>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>

此外,对于WeatherDataTransmitInboundEP(inbound endpoint),它会读取发布到Kafka的消息,然后将消息发送到inbound endpoint中定义的序列。如果要将 WeatherDataTransmitInboundEP 使用的消息发送到外部端点,则必须采用不同的方法。 在您的例子中,WeatherDataProcessSeq 在读取来自 Kafka 的消息后被调用。因此,如果您的要求是在 Kafka 中发送消息,则需要在 WeatherDataProcessSeq 中定义调用或发送中介。

如果您想进一步了解 call/send 调解员,请参阅博客 post [3]。

[1]-https://docs.wso2.com/display/EI6xx/Call+Mediator [2]-https://docs.wso2.com/display/EI600/Send+Mediator [3]-https://www.yenlo.com/blog/wso2torial-to-send-or-not-to-send-that-is-your-choice

我刚加

<send>
        <endpoint>
            <http method="post" statistics="enable" trace="enable" uri-template="http://localhost:8081/api">
            <property name="name" scope="axis2" value="messageValue"/>
                <suspendOnFailure>
                    <initialDuration>-1</initialDuration>
                    <progressionFactor>-1</progressionFactor>
                    <maximumDuration>0</maximumDuration>
                </suspendOnFailure>
                <markForSuspension>
                    <retriesBeforeSuspension>0</retriesBeforeSuspension>
                </markForSuspension>
            </http>
            
        </endpoint>
    </send>

排序并得到我想要的