kafka连接源连接器-内存不足问题

8fq7wneg  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(233)

我已经写了一个自定义Kafka连接插件读取文件和发布avro消息到一个主题。我正在使用 LineNumberReader 读取文件中的每一行,将记录与模式结合起来,并发布到主题。只有当我返回一个 SourceRecord 在poll方法中。有时,如果是一个大文件,我会遇到内存不足的问题。我研究过这个问题,读过很多博客和问题,但似乎没有什么对我有用。我读到了 commit() 中的方法 SourceTask 但不是很清楚。有人遇到过类似的问题吗?下面的代码段:

@Override
public List<SourceRecord> poll() throws InterruptedException {
final List<SourceRecord> results = new ArrayList<>();
//Get reader based on certain params:
FileReader reader = myReader;
while (reader.hasNext()) {
    results.add(getSourceRecord(file, reader.currentOffset(), reader.next()));
   }
 return results;
}

private getSourceRecord convert(String fileName, Offset offset, Struct struct) {
    return new SourceRecord(
            new HashMap<String, Object>() {
                {
                    put("path", fileName);
                }
            },
            Collections.singletonMap("offset", offset.getRecordOffset()),
            config.getWriteTopic(),
            struct.schema(),
            struct
    );
}

public boolean hasNext() {
    if (currentLine != null) {
        return true;
    } else if (finished) {
        return false;
    } else {
        try {
            while (true) {
                String line = reader.readLine();
                offset.setOffset(reader.getLineNumber());
                if (line == null) {
                    finished = true;
                    return false;
                }
                currentLine = line;
           //Removed for brevity     
        }
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题