我有一个微服务,它在Kubernetes集群中的3个Pod上运行。这个服务有一个Kafka消费者,它监听Kafka主题。我希望特定的pod侦听具有特定键值的消息。我该怎么做?
我们可以这样做:
// Create Kafka consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Get the list of partitions for the topic
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
// Find the partition(s) for the key value
List<TopicPartition> assignedPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitions) {
String key = partitionInfo.leader().host(); // Use any key from the PartitionInfo
if (key.equals(keyToAssign)) {
TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition());
assignedPartitions.add(topicPartition);
}
}
// Assign partitions to the consumer
consumer.assign(assignedPartitions);
但是我不能在代码中硬编码keyToAssign
,因为这是特定于pod的东西。
1条答案
按热度按时间qv7cva1a1#
侦听具有特定键值的消息
对于Kafka消费者来说,这不是一个合适的用例,但是您可以在pod的容器上使用环境变量,而不是
PartitionInfo
使用subscribe,而不是assign,然后在循环记录时检查它。
如果您想为一个已知键设置一个特定的分区,您应该调用
DefaultPartitioner
类,而不是consumer.partitionsFor
另一种选择是部署单独的服务。
1.读取所有键值对并构建KTable。
1.使用交互式查询发送RPC请求,查找该键