hdfs—使用kafka connect transform archive和HDFSInkConnector时的刷新大小

pbwdgjma  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(389)

我有一个Kafka主题的数据,我想保存在我的数据湖。
在担心密钥之前,我能够使用hdfssinkconnector将avro值保存在datalake上的文件中。每个文件中消息值的数目由hdfssinkconnector的“flush.size”属性确定。
一切都很好。接下来我想把钥匙也保存起来。为此,我使用了kafka connect转换存档,它将字符串键和avro值 Package 到一个新的avro模式中。
这很有效。。。除了hdfssink连接器的flush.size现在被忽略。数据湖中保存的每个文件只有一条消息。
因此,这两种情况是:1)只保存值,每个文件中的值数由flush.size决定;2)保存键和值,每个文件只包含一条消息,flush.size被忽略。
这两种情况之间的唯一区别是hdfssinkconnector的配置,它指定了存档转换。

"transforms": "tran",
"transforms.tran.type": "com.github.jcustenborder.kafka.connect.archive.Archive"

kafka connect转换归档文件是否按设计忽略刷新大小,或者是否需要一些额外的配置,以便能够在数据湖上为每个文件保存多个键、值消息?

q9rjltbz

q9rjltbz1#

我有同样的问题,当使用Kafka地面军事系统接收器连接器。
在com.github.jcustenborder.kafka.connect.archive.archive代码中,将为每条消息创建一个新架构。

private R applyWithSchema(R r) {
final Schema schema = SchemaBuilder.struct()
    .name("com.github.jcustenborder.kafka.connect.archive.Storage")
    .field("key", r.keySchema())
    .field("value", r.valueSchema())
    .field("topic", Schema.STRING_SCHEMA)
    .field("timestamp", Schema.INT64_SCHEMA);
Struct value = new Struct(schema)
    .put("key", r.key())
    .put("value", r.value())
    .put("topic", r.topic())
    .put("timestamp", r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());

}
如果您查看kafka transform insertfield$value方法,您将看到它每次都使用synchronizedcache来检索相同的模式。
https://github.com/axbaretto/kafka/blob/ba633e40ea77f28d8f385c7a92ec9601e218fb5b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/insertfield.java#l170
因此,您只需要创建一个模式(在apply函数之外)或使用相同的synchronizedcache代码。

相关问题