例如,在订阅流表时,如果有任何方法可以直接指定msg作为produce的参数。
6yjfywim1#
是的,当然可以。请尝试以下脚本:
loadPlugin("/yourpath/PluginKafka.txt") producerCfg = dict(STRING, ANY); producerCfg["metadata.broker.list"] = "localhost:9092"; producer = kafka::producer(producerCfg); def sendMsgToKafkaFunc(producer, msg){ try { kafka::produce(producer, "dolphindb-producer-test", 1, msg, true) } catch(ex) { writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex) } } subscribeTable(tableName="kafkaTest", actionName="sendMsgToKafka", offset=0, handler=sendMsgToKafkaFunc{producer}, msgAsTable=true, reconnect=true)
Kafka所使用的信息:
... {"id":[1,1,...],"datetime":["2020.01.01T10:55:12","2020.01.01T10:55:13",...],"val":[73,74,...]} {"id":[1,1,...],"datetime":["2020.01.01T23:55:12","2020.01.01T23:55:13",...],"val":[88,1,...]} ...
1条答案
按热度按时间6yjfywim1#
是的,当然可以。请尝试以下脚本:
Kafka所使用的信息: