如何从kafka producer返回一个包含数十亿条记录的arraylist?

vuv7lop3  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(376)

我准备了一个Kafka制作人,在Kafka主题中加入一个列表。它可以很好地处理一百万行/记录。我得到的制作文件,由1.1亿条以上的记录组成。在我的Kafka制作人那里,处理如此庞大数据的最佳方式是什么?
下面是代码,我曾经处理过100万条记录,把同样的内容放到Kafka主题中大约需要4分钟。

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class KafkaSourceTask extends SourceTask {

    private String filename;

    private String topic;

    private RandomAccessFile raf;

    private long lastRecordedOffset = 0L;

    private BufferedReader bufferedReader = null;

    Schema schema = SchemaBuilder.struct().field("emp_id", 
            Schema.STRING_SCHEMA).field("name", Schema.STRING_SCHEMA)
            .field("last_name", Schema.STRING_SCHEMA).field("department", 
            Schema.STRING_SCHEMA).build();

public void start(Map<String, String> props) {
    filename = props.get("file");
    topic = props.get("topic");

}

@Override
public List<SourceRecord> poll() throws InterruptedException {
    double startTime = System.nanoTime();
    try {
        bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File(filename)),
                StandardCharsets.UTF_8));
        raf = new RandomAccessFile(filename, "r");
        long filePointer = raf.getFilePointer();
        System.out.println(filePointer + " - " + lastRecordedOffset);
        if (bufferedReader.ready() && (filePointer > lastRecordedOffset || filePointer == 0)) {
            raf.seek(lastRecordedOffset);

            ArrayList<SourceRecord> records = new ArrayList<>();
            String line;
            while ((line = raf.readLine()) != null) {
                records.add(new SourceRecord(null, null, topic, schema, buildRecordValue(line)));
            }
            lastRecordedOffset = raf.getFilePointer();
            raf.close();
            bufferedReader.close();

            double endTime = System.nanoTime();
            return records;
        }
    }
    catch (IOException e) {

        e.printStackTrace();
    }

    return null;
}

@Override
public synchronized void stop() {
    try {
        raf.close();
    }
    catch (IOException e) {
        e.printStackTrace();
    }
}

private Struct buildRecordValue(String line) {
    String[] values = line.split(",");
    Struct value = new Struct(schema).put("emp_id", values[0]).put("name", values[1]).put("last_name", values[2])
            .put("department", values[3]);
    return value;
}

@Override
public String version() {
    // TODO Auto-generated method stub
    return null;
}
}

如果您对此有任何帮助或建议,我们将不胜感激。谢谢。

0vvn1miw

0vvn1miw1#

一个拥有数十亿条记录的数组列表?试想一下,如果你有10亿条记录,而每条记录的大小只有1个字节(这是一个荒谬的低估),那么你就有1sigb的内存消耗。
根据“大数据”的粗略和现成的定义,即无法在单个主机上放入内存的数据,您可能已经处于或超过了这一点,您需要开始使用大数据技术。首先您可以尝试多线程处理,然后您可以在多台计算机上尝试多线程处理,这就是使用kafka(客户机api)的优点,无论是从消费还是从生产到,都可以使这变得容易。

tag5nh1u

tag5nh1u2#

首先,Kafka生产者批记录发送给经纪人之前,你应该检查和发挥这两个配置 linger.ms 以及 batch.record.size .
现在您可以使用另一个线程来读取文件(我认为每行只有一条记录)并将它们放入java队列中,然后使用托管kafka producer的线程来连续读取该队列。
多个制作人被认为是一种反模式,特别是在写Kafka的主题时,请检查单作者原则。
不管是哪种方式,你都必须稍微调整一下你的Kafka制作人,但就像@cricket\u007所说的,你应该考虑使用Kafka连接一个文件csv连接器,至少如果你没有找到一个适合你的连接器,你可以自己开发一个连接器。
希望对你有帮助。

相关问题