java—这是通过kafka producer阅读消息并将其推向主题的正确方法吗

tjvv9vkg  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(339)

我写了这个Kafka生产者,从桌面读取一个文件,然后将文件中的数据作为值,并通过每次读取一行时添加一个来生成密钥。是正确的方法还是我做了不该做的事??请给我一些建议。我可以在我的主题中看到消息,但是每个消息都与一个键相关,所以如果我有一个用例,如果我从外部读取它,我可以像这样推送任何日志数据。我可以使用日志数据作为值,还是应该使用完全不同的逻辑。请帮忙

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class SyncProducer {

    public static void main(String[] args) throws IOException {

        File file = new File("/Users/adityaverma/Desktop/ParseData.txt");

        BufferedReader br = new BufferedReader(new FileReader(file));

        Properties properties  = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("key.serializer",StringSerializer.class.getName()); // our key and values are String
        properties.setProperty("value.serializer",StringSerializer.class.getName());
        properties.setProperty("acks", "1"); 
        properties.setProperty("retries", "3"); 
        properties.setProperty("linger.ms", "1"); 

        Producer<String,String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String,String>(properties);
        // these will go in random partition as we increment the key

        String line = " ";

        int key = 0;
        while((line = br.readLine()) != null){
        //  System.out.println(line);

        ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("try_Buffered3Part",Integer.toString(key),line);
         key++;
         System.out.println(key);
        producer.send(producerRecord);

        }
        producer.close();
        System.out.println("exit");
    }

}
6ss1mwsb

6ss1mwsb1#

看起来不错。如果你对随机分区很在行,你可以选择使用空键。
您可能还想看看logstash和kafka的集成。
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

khbbv19g

khbbv19g2#

这是正确的方法吗
你的目标并不明确。你能使用终端的数据吗?那你生产的很好。
可以使用整数作为键。Kafka有一个整合器
使用null作为键或排除该参数是将数据发送到随机分区的标准方法,您不会遇到整数重载
我想通过kafka从某个源读取日志,然后将其写入hdfs。
如果您只想将数据记录到hadoop中,fluentd或logstash可以实现这一点。
在你开始使用Kafka的方法之前,你应该选择一种数据格式。例如,hadoop和kafka更喜欢avro或json而不是csv。confluent有大量将avro制作成Kafka的文档
您可以使用kafka connect hdfs连接器或apachenifi将kafka数据导入hadoop。不要重新发明轮子写你自己的消费者。

相关问题