如何向kafka发送连续的http流数据?

neekobn8  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(451)

我们有一个通过http发送时间序列数据的系统(例如systema)。
我现在想把apachekafka引入这个系统。
我的想法是让kafka服务器运行,系统a在接收到时间序列消息后,立即将该消息发布到apachekafka服务器。
在这里,我有一个疑问,如何通过http协议发布时间序列数据?
下面是一个例子。
我有以下url来收集数据:
http://ipaddress:portno/httppush/track?事件
接收下列扫描数据
2017-08-17 00:00:21信息:?数据=$sensor,x88683236474805720170816235919,13,1,1,5 | 2,0,720170816235854,0,0,0128,04095 | 3,0,1020170816235854,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00 | 2,0,720170816235859,0,0,0128,04095
注意:我收到20kps的数据
有谁能帮我提供这个示例的kafka producerapi示例吗。

cig3rfwq

cig3rfwq1#

在这里,kafka rest代理可能很有用。源系统可以通过http将其数据直接流式传输到kafka。

wmomyfyw

wmomyfyw2#

您可以将url中的数据转换为字符串,并将该数据转换为将数据发布到kafka的方法。首先,用producer属性初始化producer(例如,请参阅下面的代码段)

producer = new KafkaProducer<String, String>(KafkaConfiguration.producerProperties);
            logger.info("JetKafkaProducer{" + this.hashCode() + "} initalised Successfully ,for the topic {" + topic
                    + "}");
            logger.debug("JetKafkaProducer with hash code: {" + this.hashCode() + "}, for the topic{" + topic
                    + "} has producer properties: " + KafkaConfiguration.producerProperties);

现在发布消息

public void publishMesssage(String data) throws Exception {
    if (getTopic() != null && data != null) {
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(this.getTopic(), data);
        producer.send(record); // This publishes message on given topic
        logger.info("Published record to Kafka " + data.toString());
    } else {
        logger.error("Failed to publish record to " + getTopic() + " Data: " + data.toString());
    }
    return;
}

相关问题