apache-kafka 我可以订阅一个表并将其发布给Kafka producer吗?

wkyowqbh  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(138)

例如,在订阅流表时,如果有任何方法可以直接指定msg作为produce的参数。

6yjfywim

6yjfywim1#

是的,当然可以。请尝试以下脚本:

loadPlugin("/yourpath/PluginKafka.txt")

producerCfg = dict(STRING, ANY);
producerCfg["metadata.broker.list"] = "localhost:9092";
producer = kafka::producer(producerCfg);

def sendMsgToKafkaFunc(producer, msg){
    try {
        kafka::produce(producer, "dolphindb-producer-test", 1, msg, true)
    }
    catch(ex) {
        writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex)
    }
}

subscribeTable(tableName="kafkaTest", actionName="sendMsgToKafka", offset=0, handler=sendMsgToKafkaFunc{producer}, msgAsTable=true, reconnect=true)

Kafka所使用的信息:

...
{"id":[1,1,...],"datetime":["2020.01.01T10:55:12","2020.01.01T10:55:13",...],"val":[73,74,...]}
{"id":[1,1,...],"datetime":["2020.01.01T23:55:12","2020.01.01T23:55:13",...],"val":[88,1,...]}
...

相关问题