我写了这个Kafka生产者,从桌面读取一个文件,然后将文件中的数据作为值,并通过每次读取一行时添加一个来生成密钥。是正确的方法还是我做了不该做的事??请给我一些建议。我可以在我的主题中看到消息,但是每个消息都与一个键相关,所以如果我有一个用例,如果我从外部读取它,我可以像这样推送任何日志数据。我可以使用日志数据作为值,还是应该使用完全不同的逻辑。请帮忙
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class SyncProducer {
public static void main(String[] args) throws IOException {
File file = new File("/Users/adityaverma/Desktop/ParseData.txt");
BufferedReader br = new BufferedReader(new FileReader(file));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("key.serializer",StringSerializer.class.getName()); // our key and values are String
properties.setProperty("value.serializer",StringSerializer.class.getName());
properties.setProperty("acks", "1");
properties.setProperty("retries", "3");
properties.setProperty("linger.ms", "1");
Producer<String,String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String,String>(properties);
// these will go in random partition as we increment the key
String line = " ";
int key = 0;
while((line = br.readLine()) != null){
// System.out.println(line);
ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("try_Buffered3Part",Integer.toString(key),line);
key++;
System.out.println(key);
producer.send(producerRecord);
}
producer.close();
System.out.println("exit");
}
}
2条答案
按热度按时间6ss1mwsb1#
看起来不错。如果你对随机分区很在行,你可以选择使用空键。
您可能还想看看logstash和kafka的集成。
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
khbbv19g2#
这是正确的方法吗
你的目标并不明确。你能使用终端的数据吗?那你生产的很好。
可以使用整数作为键。Kafka有一个整合器
使用null作为键或排除该参数是将数据发送到随机分区的标准方法,您不会遇到整数重载
我想通过kafka从某个源读取日志,然后将其写入hdfs。
如果您只想将数据记录到hadoop中,fluentd或logstash可以实现这一点。
在你开始使用Kafka的方法之前,你应该选择一种数据格式。例如,hadoop和kafka更喜欢avro或json而不是csv。confluent有大量将avro制作成Kafka的文档
您可以使用kafka connect hdfs连接器或apachenifi将kafka数据导入hadoop。不要重新发明轮子写你自己的消费者。