为kafka编译定制生产者

eaf3rand  于 2023-01-14  发布在  Kafka
关注(0)|答案(1)|浏览(354)

我从网上下载了Kafkahttps://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz并使用vms在我的机器上设置kafka集群。集群运行良好-它是使用kafka包提供的控制台生产者和消费者进行测试的。
现在,我已经为kafka实现了一个定制的producer类。但是我不知道如何编译这个类以及依赖关系是什么。
问题
有人能解释一下我需要如何获取生产者的依赖项,构建类并运行它吗?
我需要sbt来建造它吗?我找不到任何在线资源清楚地解释了如何构建一个定制的Kafka生产者类。
以下是在producer类中导入的包:

org.apache.kafka.clients.producer.Callback;
org.apache.kafka.clients.producer.KafkaProducer;
org.apache.kafka.clients.producer.ProducerConfig;
org.apache.kafka.clients.producer.ProducerRecord;
org.apache.kafka.clients.producer.RecordMetadata;
org.apache.kafka.common.record.Records

提前谢谢

mftmpeh8

mftmpeh81#

我开发了一个定制的Kafka制作人,作为一个maven项目,我使用的依赖关系是:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

我使用的导入:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

我的生产者消息发送代码片段:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, zkConnection);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

byte[] byteData = null;
File myInputFile = new File(...);
try (InputStream inputStream = new FileInputStream(myInputFile)) {
    byteData = IOUtils.toByteArray(inputStream);
}

try (KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props)) {
    producer.send(new ProducerRecord<String, byte[]>(topic, byteData));
}

相关问题