休息wso2Kafka

djmepvbi  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(418)

我已经在windows工作站上安装了wso2 integration studio版本6.5.0,并使用kafka consumer和producer内置模板创建了一个项目。
api.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
                <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
                <maxPoolSize>50</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>weatherdatatopic</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                    {"topic":"$1", "partition":"$2", "offset":"$3"}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

weatherdatatransmitinboundep.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" name="WeatherDataTransmitInboundEP" onError="WeatherDataErrorSeq" sequence="WeatherDataProcessSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
    <parameters>
        <parameter name="sequential">true</parameter>
        <parameter name="interval">10</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="inbound.behavior">polling</parameter>
        <parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
        <parameter name="topic.name">weatherdatatopic</parameter>
        <parameter name="poll.timeout">100</parameter>
        <parameter name="bootstrap.servers">localhost:9092</parameter>
        <parameter name="group.id">hello</parameter>
        <parameter name="contentType">application/json</parameter>
        <parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
        <parameter name="class">org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
    </parameters>

</inboundEndpoint>

weatherdatapublishservice.xml:

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="WeatherDataPublishService" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
                <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
                <maxPoolSize>50</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>weatherdatatopic</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                    {"topic":"$1", "partition":"$2", "offset":"$3"}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>

现在我可以发送邮件请求到http://localhost:8290/发布天气数据并在Kafka主题中获取。我还可以在wso2中接收Kafka生成的消息。如何从wso2向外部服务发送消息?我想我应该用

<endpoint [name="string"] [key="string"]>
        address-endpoint | default-endpoint | wsdl-endpoint | load-balanced-endpoint | fail-over-endpoint
</endpoint>

但我不知道它必须添加到哪里以及如何配置

mnowg1ta

mnowg1ta1#

我只是补充一句

<send>
        <endpoint>
            <http method="post" statistics="enable" trace="enable" uri-template="http://localhost:8081/api">
            <property name="name" scope="axis2" value="messageValue"/>
                <suspendOnFailure>
                    <initialDuration>-1</initialDuration>
                    <progressionFactor>-1</progressionFactor>
                    <maximumDuration>0</maximumDuration>
                </suspendOnFailure>
                <markForSuspension>
                    <retriesBeforeSuspension>0</retriesBeforeSuspension>
                </markForSuspension>
            </http>

        </endpoint>
    </send>

排序并得到我想要的

cx6n0qe3

cx6n0qe32#

您可以使用调用中介器[1]或发送中介器[2]来实现您的用例。在中介器中,可以定义要调用的所需端点。请参考以下配置示例。这里我们使用了一个调用中介来调用外部端点http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd.

<call>
        <endpoint>
           <address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
        </endpoint>
     </call>

在您的用例中,如果您已经在负载工厂中介器之后完成构建所需的负载,那么您可以在负载工厂中介器之后使用调用中介器来调用外部端点。这里,有效负载工厂中介器生成的有效负载将用于调用外部端点。

<payloadFactory media-type="json">
                <format>
                    {"topic":"$1", "partition":"$2", "offset":"$3"}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>

             <call>
                <endpoint>
                   <address uri="http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd"/>
                </endpoint>
             </call>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>

此外,对于weatherdatatransmitinboundep(入站端点),它将读取发布到kafka的消息,然后将消息发送到入站端点中定义的序列。如果要将weatherdatatransmitinboundep使用的消息发送到外部端点,则必须采用不同的方法。在您的例子中,weatherdataprocessseq是在从kafka读取消息之后调用的。因此,如果您的需求是用kafka发送消息,那么您需要在weatherdataprocessseq中定义调用或发送中介。
如果您想进一步澄清有关呼叫/发送调解人,请参阅博客文章[3]。
[1]-https://docs.wso2.com/display/ei6xx/call+mediator [2]-https://docs.wso2.com/display/ei600/send+mediator [3]-https://www.yenlo.com/blog/wso2torial-to-send-or-not-to-send-that-is-your-choice

相关问题