我准备了一个Kafka制作人,在Kafka主题中加入一个列表。它可以很好地处理一百万行/记录。我得到的制作文件,由1.1亿条以上的记录组成。在我的Kafka制作人那里,处理如此庞大数据的最佳方式是什么?
下面是代码,我曾经处理过100万条记录,把同样的内容放到Kafka主题中大约需要4分钟。
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
public class KafkaSourceTask extends SourceTask {
private String filename;
private String topic;
private RandomAccessFile raf;
private long lastRecordedOffset = 0L;
private BufferedReader bufferedReader = null;
Schema schema = SchemaBuilder.struct().field("emp_id",
Schema.STRING_SCHEMA).field("name", Schema.STRING_SCHEMA)
.field("last_name", Schema.STRING_SCHEMA).field("department",
Schema.STRING_SCHEMA).build();
public void start(Map<String, String> props) {
filename = props.get("file");
topic = props.get("topic");
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
double startTime = System.nanoTime();
try {
bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File(filename)),
StandardCharsets.UTF_8));
raf = new RandomAccessFile(filename, "r");
long filePointer = raf.getFilePointer();
System.out.println(filePointer + " - " + lastRecordedOffset);
if (bufferedReader.ready() && (filePointer > lastRecordedOffset || filePointer == 0)) {
raf.seek(lastRecordedOffset);
ArrayList<SourceRecord> records = new ArrayList<>();
String line;
while ((line = raf.readLine()) != null) {
records.add(new SourceRecord(null, null, topic, schema, buildRecordValue(line)));
}
lastRecordedOffset = raf.getFilePointer();
raf.close();
bufferedReader.close();
double endTime = System.nanoTime();
return records;
}
}
catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public synchronized void stop() {
try {
raf.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
private Struct buildRecordValue(String line) {
String[] values = line.split(",");
Struct value = new Struct(schema).put("emp_id", values[0]).put("name", values[1]).put("last_name", values[2])
.put("department", values[3]);
return value;
}
@Override
public String version() {
// TODO Auto-generated method stub
return null;
}
}
如果您对此有任何帮助或建议,我们将不胜感激。谢谢。
2条答案
按热度按时间0vvn1miw1#
一个拥有数十亿条记录的数组列表?试想一下,如果你有10亿条记录,而每条记录的大小只有1个字节(这是一个荒谬的低估),那么你就有1sigb的内存消耗。
根据“大数据”的粗略和现成的定义,即无法在单个主机上放入内存的数据,您可能已经处于或超过了这一点,您需要开始使用大数据技术。首先您可以尝试多线程处理,然后您可以在多台计算机上尝试多线程处理,这就是使用kafka(客户机api)的优点,无论是从消费还是从生产到,都可以使这变得容易。
tag5nh1u2#
首先,Kafka生产者批记录发送给经纪人之前,你应该检查和发挥这两个配置
linger.ms
以及batch.record.size
.现在您可以使用另一个线程来读取文件(我认为每行只有一条记录)并将它们放入java队列中,然后使用托管kafka producer的线程来连续读取该队列。
多个制作人被认为是一种反模式,特别是在写Kafka的主题时,请检查单作者原则。
不管是哪种方式,你都必须稍微调整一下你的Kafka制作人,但就像@cricket\u007所说的,你应该考虑使用Kafka连接一个文件csv连接器,至少如果你没有找到一个适合你的连接器,你可以自己开发一个连接器。
希望对你有帮助。