REST WSO2 卡夫卡
REST WSO2 Kafka
我已经在我的 Windows 工作站中安装了 WSO2 Integration Studio 版本 6.5.0,并使用 Kafka 消费者和生产者内置模板创建了一个项目。
api.xml:
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
<maxPoolSize>50</maxPoolSize>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>weatherdatatopic</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"", "partition":"", "offset":""}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
weatherdatatransmitinboundEP.xml:
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" name="WeatherDataTransmitInboundEP" onError="WeatherDataErrorSeq" sequence="WeatherDataProcessSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="sequential">true</parameter>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="topic.name">weatherdatatopic</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="group.id">hello</parameter>
<parameter name="contentType">application/json</parameter>
<parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="class">org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
</parameters>
</inboundEndpoint>
WeatherDataPublishService.xml:
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="WeatherDataPublishService" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
<maxPoolSize>50</maxPoolSize>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>weatherdatatopic</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"", "partition":"", "offset":""}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
</proxy>
现在我可以向 http://localhost:8290/publishweatherdata 发送 post 请求并在 kafka 主题中获取它。我也可以在 wso2 中收到 kafka 生成的消息。如何从 wso2 向外部服务发送消息?我想我应该使用
<endpoint [name="string"] [key="string"]>
address-endpoint | default-endpoint | wsdl-endpoint | load-balanced-endpoint | fail-over-endpoint
</endpoint>
但我不知道必须在哪里添加它以及如何配置
您可以使用调用调解器 [1] 或发送调解器 [2] 来实现您的用例。在调解器中,您可以定义要调用的所需端点。请参考以下示例配置。这里我们使用了调用中介来调用外部端点 http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd.
<call>
<endpoint>
<address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
</endpoint>
</call>
在您的用例中,如果您在有效载荷工厂调解器之后完成了所需有效载荷的构建,则可以在有效载荷工厂调解器之后使用调用调解器来调用外部端点。此处由有效负载工厂中介构建的有效负载将用于调用外部端点。
<payloadFactory media-type="json">
<format>
{"topic":"", "partition":"", "offset":""}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<call>
<endpoint>
<address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
</endpoint>
</call>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
此外,对于WeatherDataTransmitInboundEP(inbound endpoint),它会读取发布到Kafka的消息,然后将消息发送到inbound endpoint中定义的序列。如果要将 WeatherDataTransmitInboundEP 使用的消息发送到外部端点,则必须采用不同的方法。
在您的例子中,WeatherDataProcessSeq 在读取来自 Kafka 的消息后被调用。因此,如果您的要求是在 Kafka 中发送消息,则需要在 WeatherDataProcessSeq 中定义调用或发送中介。
如果您想进一步了解 call/send 调解员,请参阅博客 post [3]。
[1]-https://docs.wso2.com/display/EI6xx/Call+Mediator
[2]-https://docs.wso2.com/display/EI600/Send+Mediator
[3]-https://www.yenlo.com/blog/wso2torial-to-send-or-not-to-send-that-is-your-choice
我刚加
<send>
<endpoint>
<http method="post" statistics="enable" trace="enable" uri-template="http://localhost:8081/api">
<property name="name" scope="axis2" value="messageValue"/>
<suspendOnFailure>
<initialDuration>-1</initialDuration>
<progressionFactor>-1</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</http>
</endpoint>
</send>
排序并得到我想要的
我已经在我的 Windows 工作站中安装了 WSO2 Integration Studio 版本 6.5.0,并使用 Kafka 消费者和生产者内置模板创建了一个项目。
api.xml:
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
<maxPoolSize>50</maxPoolSize>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>weatherdatatopic</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"", "partition":"", "offset":""}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
weatherdatatransmitinboundEP.xml:
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" name="WeatherDataTransmitInboundEP" onError="WeatherDataErrorSeq" sequence="WeatherDataProcessSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="sequential">true</parameter>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="topic.name">weatherdatatopic</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="group.id">hello</parameter>
<parameter name="contentType">application/json</parameter>
<parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="class">org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
</parameters>
</inboundEndpoint>
WeatherDataPublishService.xml:
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="WeatherDataPublishService" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
<maxPoolSize>50</maxPoolSize>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>weatherdatatopic</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"", "partition":"", "offset":""}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
</proxy>
现在我可以向 http://localhost:8290/publishweatherdata 发送 post 请求并在 kafka 主题中获取它。我也可以在 wso2 中收到 kafka 生成的消息。如何从 wso2 向外部服务发送消息?我想我应该使用
<endpoint [name="string"] [key="string"]>
address-endpoint | default-endpoint | wsdl-endpoint | load-balanced-endpoint | fail-over-endpoint
</endpoint>
但我不知道必须在哪里添加它以及如何配置
您可以使用调用调解器 [1] 或发送调解器 [2] 来实现您的用例。在调解器中,您可以定义要调用的所需端点。请参考以下示例配置。这里我们使用了调用中介来调用外部端点 http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd.
<call>
<endpoint>
<address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
</endpoint>
</call>
在您的用例中,如果您在有效载荷工厂调解器之后完成了所需有效载荷的构建,则可以在有效载荷工厂调解器之后使用调用调解器来调用外部端点。此处由有效负载工厂中介构建的有效负载将用于调用外部端点。
<payloadFactory media-type="json">
<format>
{"topic":"", "partition":"", "offset":""}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<call>
<endpoint>
<address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
</endpoint>
</call>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
此外,对于WeatherDataTransmitInboundEP(inbound endpoint),它会读取发布到Kafka的消息,然后将消息发送到inbound endpoint中定义的序列。如果要将 WeatherDataTransmitInboundEP 使用的消息发送到外部端点,则必须采用不同的方法。 在您的例子中,WeatherDataProcessSeq 在读取来自 Kafka 的消息后被调用。因此,如果您的要求是在 Kafka 中发送消息,则需要在 WeatherDataProcessSeq 中定义调用或发送中介。
如果您想进一步了解 call/send 调解员,请参阅博客 post [3]。
[1]-https://docs.wso2.com/display/EI6xx/Call+Mediator [2]-https://docs.wso2.com/display/EI600/Send+Mediator [3]-https://www.yenlo.com/blog/wso2torial-to-send-or-not-to-send-that-is-your-choice
我刚加
<send>
<endpoint>
<http method="post" statistics="enable" trace="enable" uri-template="http://localhost:8081/api">
<property name="name" scope="axis2" value="messageValue"/>
<suspendOnFailure>
<initialDuration>-1</initialDuration>
<progressionFactor>-1</progressionFactor>
<maximumDuration>0</maximumDuration>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</http>
</endpoint>
</send>
排序并得到我想要的