过去几周,我们一直在致力于将kafka connect添加到我们的数据平台,并认为这是一种将kafka的数据提取到s3数据湖中的有用方法。我们已经使用了fieldpartitioner和timebasepartitioner,看到了一些相当不错的结果。
我们还需要按用户id进行分区-但是尝试在用户id字段上使用fieldpartitioner后,连接器速度非常慢-尤其是与按日期等进行分区相比。我知道按id分区将创建大量的输出分区,因此速度不会那么快-这很好,但需要能够跟上与制片人合作。
到目前为止,我们已经尝试增加内存和堆-但我们通常不会看到任何内存问题,除非我们将flush.size增加到一个大的数字。我们也尝试过小的刷新大小,非常小和大的rotate.schedule.interval.ms配置。我们也研究了网络,但这似乎是好的-使用其他分区网络保持良好。
在可能为此浪费大量时间之前,是否有人尝试或成功地使用s3接收器连接器按id字段进行分区,尤其是在更大的主题上?或者有人对配置或设置有什么建议吗?
1条答案
按热度按时间wz1wpwve1#
我不习惯Kafka的连接器,但我至少会尽力帮忙。
我不知道你是否可以配置连接器到Kafka主题的分区级别;我想这是有办法的。
一种可能的方法是把重点放在客户向Kafka经纪人提供产品的步骤上。我的建议是实施你自己的
Partitioner
,以便“进一步”控制Kafka方面的数据发送位置。这是自定义分区器的一个示例/简化。例如
key
您发送的格式如下:id_name_date
. 此自定义分区程序尝试提取第一个元素(id
)然后选择所需的分区。即使你需要更多的时间
KafkaConnect
另一方面,我相信这个选择可能会有所帮助。假设一个主题有5个分区getPartitionForId
只需检查id的第一个数字就可以决定分区(为简化起见,min id为100,max id为599)。因此,如果收到的密钥是,f.e:
123_tempdata_20201203
,则分区方法将返回0
,即第一个分区。(图中显示的是p1而不是p0,因为我相信这个例子看起来更自然,但是请注意,第一个分区实际上定义为
partition 0
. 好吧,老实说,我在画这个的时候忘记了p0,也没有保存模板,所以我不得不找个借口,比如:看起来更自然)。基本上,这将是一个预先调整,或一个游戏,在s3上传之前。
我知道这可能不是理想的答案,因为我不知道你的系统的确切规格。我的猜测是有可能直接将主题分区指向s3位置。
如果不可能这样做,至少我希望这能给你一些进一步的想法。干杯!