我有一个Kafka主题的数据,我想保存在我的数据湖。
在担心密钥之前,我能够使用hdfssinkconnector将avro值保存在datalake上的文件中。每个文件中消息值的数目由hdfssinkconnector的“flush.size”属性确定。
一切都很好。接下来我想把钥匙也保存起来。为此,我使用了kafka connect转换存档,它将字符串键和avro值 Package 到一个新的avro模式中。
这很有效。。。除了hdfssink连接器的flush.size现在被忽略。数据湖中保存的每个文件只有一条消息。
因此,这两种情况是:1)只保存值,每个文件中的值数由flush.size决定;2)保存键和值,每个文件只包含一条消息,flush.size被忽略。
这两种情况之间的唯一区别是hdfssinkconnector的配置,它指定了存档转换。
"transforms": "tran",
"transforms.tran.type": "com.github.jcustenborder.kafka.connect.archive.Archive"
kafka connect转换归档文件是否按设计忽略刷新大小,或者是否需要一些额外的配置,以便能够在数据湖上为每个文件保存多个键、值消息?
1条答案
按热度按时间q9rjltbz1#
我有同样的问题,当使用Kafka地面军事系统接收器连接器。
在com.github.jcustenborder.kafka.connect.archive.archive代码中,将为每条消息创建一个新架构。
}
如果您查看kafka transform insertfield$value方法,您将看到它每次都使用synchronizedcache来检索相同的模式。
https://github.com/axbaretto/kafka/blob/ba633e40ea77f28d8f385c7a92ec9601e218fb5b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/insertfield.java#l170
因此,您只需要创建一个模式(在apply函数之外)或使用相同的synchronizedcache代码。