Kafka流:按键部分重新处理

niknxzdl  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(541)

脚本:
在kafkastreams web会话场景中,具有无限(或数年)的保留期,具有交互式查询(必要时可以查看),具有多个客户端,每个客户端都有多个用户(每个用户对每个客户端都是特定的),其中分区是这样的:
按(clientid,userid)%numberofpartitions的函数进行分区,根据集群大小预先设置numberofpartitions。这将允许对(clientid、userid)数据执行会话,并应在节点之间提供均匀的数据分布(无热插拔、分区大小或写负载)。
但是,在查询时,我会按客户机(和时间范围)进行查询。因此,我将从sessions表构建一个聚合的ktable,其中key是客户机,sessions由(client、timestart、timeend)查询。这将使来自客户机的数据必须进入一个节点,这可能会带来可伸缩性问题(客户机太大),但由于数据已经聚合,我想这是可以管理的。
问题:
在这个场景中,我希望只能为一个客户机重新处理。
但是来自一个客户机的数据将分散在(可能是所有)分区中。
如何在影响较小的kafka流中实现部分再处理,同时保持(旧)状态可查询?

7d7tgy0s

7d7tgy0s1#

一般来说,我认为您已经知道问题的答案,使用您所描述的分区方案,如果您要重新处理客户机,您必须阅读所有分区,因为消息将分布在所有分区中。
在重新处理整个客户机时,我唯一能想到的限制开销的方法是实现一个分区方案,该方案为一个客户机将多个分区分组,然后在这些分区上分配用户,以避免用一个特别“大”的客户机使一个分区过载。希望这张照片能澄清我可能无法用文字解释的问题。。

实现此发行版的自定义分区器可能类似于以下代码。请恕我直言,到目前为止,这完全是理论上的,从来没有被测试过(甚至没有运行过),但它应该说明原理。

public class ClientUserPartitioner implements Partitioner {
int partitionGroupSize = 10;

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // For this we expect the key to be of the format "client/user"
    String[] splitValues = ((String)key).split("/");
    String client = splitValues[0];
    String user = splitValues[1];

    // Check that partitioncount is divisible by group size
    if (cluster.availablePartitionsForTopic(topic).size() % partitionGroupSize != 0) {
        throw new ConfigException("Partitioncount must be divisible by "+ partitionGroupSize +" for this partitioner but is " +
                cluster.availablePartitionsForTopic(topic).size() + " for topic " + topic);
    }

    // Calculate partition group from client and specific partition from user
    int clientPartitionOffset = Utils.murmur2(client.getBytes()) % partitionGroupSize * partitionGroupSize;
    int userPartition = Utils.murmur2(user.getBytes()) % partitionGroupSize;

    // Combine group and specific value to get final partition
    return clientPartitionOffset + userPartition;
}

@Override
public void configure(Map<String, ?> configs) {
    if (configs.containsKey("partition.group.size")) {
        this.partitionGroupSize = Integer.parseInt((String)configs.get("partition.group.size"));
    }
}

@Override
public void close() {
}

}
这样做当然会影响您的分发,您可能需要使用partitiongroupsize的不同值和具有代表性的数据样本来运行一些模拟,以估计分发的均匀性以及重新处理整个客户机时节省的开销。

相关问题