如何修改使用kafka connect s3连接器上载的s3对象的文件名?

smdncfj3  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(303)

我已经使用s3连接器几个星期了,我想改变连接器命名每个文件的方式。我使用的是hourlybasedpartition,因此每个文件的路径已经足够我查找每个文件,我希望文件名对于所有文件都是通用的,比如'data.json.gzip'(带有来自分区器的相应路径)。
例如,我想从这个开始:

<prefix>/<topic>/<HourlyBasedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

对此:

<prefix>/<topic>/<HourlyBasedPartition>/Data.<format>

这样做的目的是只调用s3一次,以便以后下载文件,而不必先查找文件名,然后再下载。
在名为“kafka-connect-s3”的文件夹中搜索文件时,我找到了以下文件:https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/topicpartitionwriter.java 最后有以下功能:

private RecordWriter getWriter(SinkRecord record, String encodedPartition)
      throws ConnectException {
    if (writers.containsKey(encodedPartition)) {
      return writers.get(encodedPartition);
    }
    String commitFilename = getCommitFilename(encodedPartition);
    log.debug(
        "Creating new writer encodedPartition='{}' filename='{}'",
        encodedPartition,
        commitFilename
    );
    RecordWriter writer = writerProvider.getRecordWriter(connectorConfig, commitFilename);
    writers.put(encodedPartition, writer);
    return writer;
  }

  private String getCommitFilename(String encodedPartition) {
    String commitFile;
    if (commitFiles.containsKey(encodedPartition)) {
      commitFile = commitFiles.get(encodedPartition);
    } else {
      long startOffset = startOffsets.get(encodedPartition);
      String prefix = getDirectoryPrefix(encodedPartition);
      commitFile = fileKeyToCommit(prefix, startOffset);
      commitFiles.put(encodedPartition, commitFile);
    }
    return commitFile;
  }

  private String fileKey(String topicsPrefix, String keyPrefix, String name) {
    String suffix = keyPrefix + dirDelim + name;
    return StringUtils.isNotBlank(topicsPrefix)
           ? topicsPrefix + dirDelim + suffix
           : suffix;
  }

  private String fileKeyToCommit(String dirPrefix, long startOffset) {
    String name = tp.topic()
                      + fileDelim
                      + tp.partition()
                      + fileDelim
                      + String.format(zeroPadOffsetFormat, startOffset)
                      + extension;
    return fileKey(topicsDir, dirPrefix, name);
  }

我不知道这是否可以定制为我想做的,但似乎有点接近/相关的我的意图。希望有帮助。
(还向github提交了一个问题:https://github.com/confluentinc/kafka-connect-storage-cloud/issues/369)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题