java 如何为运行在Kubernetes集群上的微服务中的Kafka消费者分配特定的分区密钥?

u91tlkcl  于 2023-06-04  发布在  Java
关注(0)|答案(1)|浏览(141)

我有一个微服务,它在Kubernetes集群中的3个Pod上运行。这个服务有一个Kafka消费者,它监听Kafka主题。我希望特定的pod侦听具有特定键值的消息。我该怎么做?
我们可以这样做:

// Create Kafka consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // Get the list of partitions for the topic
        List<PartitionInfo> partitions = consumer.partitionsFor(topic);

        // Find the partition(s) for the key value
        List<TopicPartition> assignedPartitions = new ArrayList<>();
        for (PartitionInfo partitionInfo : partitions) {
            String key = partitionInfo.leader().host();  // Use any key from the PartitionInfo
            if (key.equals(keyToAssign)) {
                TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition());
                assignedPartitions.add(topicPartition);
            }
        }

        // Assign partitions to the consumer
        consumer.assign(assignedPartitions);

但是我不能在代码中硬编码keyToAssign,因为这是特定于pod的东西。

qv7cva1a

qv7cva1a1#

侦听具有特定键值的消息
对于Kafka消费者来说,这不是一个合适的用例,但是您可以在pod的容器上使用环境变量,而不是PartitionInfo
使用subscribe,而不是assign,然后在循环记录时检查它。

for (ConsumerRecord<String, ?> record: consumer.poll()) {
  if (record.key().equals(System.getEnv("FILTER_KEY")) {
    // process(record);
  } 
}

如果您想为一个已知键设置一个特定的分区,您应该调用DefaultPartitioner类,而不是consumer.partitionsFor
另一种选择是部署单独的服务。
1.读取所有键值对并构建KTable。
1.使用交互式查询发送RPC请求,查找该键

相关问题