kafka connect s3接收器连接器按id字段划分大型主题

qnzebej0  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(376)

过去几周,我们一直在致力于将kafka connect添加到我们的数据平台,并认为这是一种将kafka的数据提取到s3数据湖中的有用方法。我们已经使用了fieldpartitioner和timebasepartitioner,看到了一些相当不错的结果。
我们还需要按用户id进行分区-但是尝试在用户id字段上使用fieldpartitioner后,连接器速度非常慢-尤其是与按日期等进行分区相比。我知道按id分区将创建大量的输出分区,因此速度不会那么快-这很好,但需要能够跟上与制片人合作。
到目前为止,我们已经尝试增加内存和堆-但我们通常不会看到任何内存问题,除非我们将flush.size增加到一个大的数字。我们也尝试过小的刷新大小,非常小和大的rotate.schedule.interval.ms配置。我们也研究了网络,但这似乎是好的-使用其他分区网络保持良好。
在可能为此浪费大量时间之前,是否有人尝试或成功地使用s3接收器连接器按id字段进行分区,尤其是在更大的主题上?或者有人对配置或设置有什么建议吗?

wz1wpwve

wz1wpwve1#

我不习惯Kafka的连接器,但我至少会尽力帮忙。
我不知道你是否可以配置连接器到Kafka主题的分区级别;我想这是有办法的。
一种可能的方法是把重点放在客户向Kafka经纪人提供产品的步骤上。我的建议是实施你自己的 Partitioner ,以便“进一步”控制Kafka方面的数据发送位置。
这是自定义分区器的一个示例/简化。例如 key 您发送的格式如下: id_name_date . 此自定义分区程序尝试提取第一个元素( id )然后选择所需的分区。

public class IdPartitioner implements Partitioner 
{       
   @Override
   public int partition(String topic, Object key, byte[] kb, 
                        Object v, byte[] vb, Cluster cl) 
   {
       try 
       {
         String pKey= (String) key;
         int id = Integer.parseInt(pKey.substring(0,pKey.indexOf("_")));

          /* getPartitionForId would decide which partition number corresponds
           for the received ID.You could also implement the logic directly here.*/

         return getPartitionForId(id);
       }
       catch (Exception e)
       {return 0;}
   }

   @Override
   public void close() 
   {
     //maybe some work here if needed
   }
}

即使你需要更多的时间 KafkaConnect 另一方面,我相信这个选择可能会有所帮助。假设一个主题有5个分区 getPartitionForId 只需检查id的第一个数字就可以决定分区(为简化起见,min id为100,max id为599)。
因此,如果收到的密钥是,f.e: 123_tempdata_20201203 ,则分区方法将返回 0 ,即第一个分区。
(图中显示的是p1而不是p0,因为我相信这个例子看起来更自然,但是请注意,第一个分区实际上定义为 partition 0 . 好吧,老实说,我在画这个的时候忘记了p0,也没有保存模板,所以我不得不找个借口,比如:看起来更自然)。

基本上,这将是一个预先调整,或一个游戏,在s3上传之前。
我知道这可能不是理想的答案,因为我不知道你的系统的确切规格。我的猜测是有可能直接将主题分区指向s3位置。
如果不可能这样做,至少我希望这能给你一些进一步的想法。干杯!

相关问题