List<TopicPartition> topicPartitions = consumer.partitionsFor("your_topic").stream().map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()).toList();
consumer.assign(topicPartitions);
// Populate the map
Map<TopicPartition, Long> partitionTimestamp = new LinkedHashMap<>();
// Add the same timestamp received from frontend for all partitions
topicPartitions.forEach(topicPartition -> partitionTimestamp.put(topicPartition, timestampFromFrontend));
// Get the offsets and seek
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(offsetsForTimestamp);
// Seek to the offsets
offsetsForTimes.forEach( (tp, oft) -> consumer.seek(tp, oft.offset()) );
// Poll and return
consumer.poll();
2条答案
按热度按时间hi3rlvi21#
如何限制电子邮件功能一次在任何一个pod上工作?
简言之:对所有触发电子邮件的pod使用相同的消费群。通常,工作负载根据其所做的工作分为若干组。同一组的成员彼此分担工作量。
你当然可以给Kafka这样的消费者配置
bootstrap.servers
等你的豆荚。在该配置中,提供一个名为group.id
一些价值,比如email-trigger-group
例如,然后工作负载将按照您的预期进行共享。你可以在你的豆荚上使用标签来触发电子邮件。您可以为您的消费者使用相同的标签值
group.id
为了你所有的豆荚。我们可以把这个问题分成两个子问题:
1触发电子邮件
此工作负载可以由组中的多个使用者共享。
2回答前端的请求
使用说明书
consumer.assign()
对于整个主题(所有分区)。前端将指定它希望从中获得新消息的时间戳,即带有时间戳的消息
>
将从主题的所有分区检索此时间戳。使用consumer.offsetsForTimes()
要获取时间戳,轮询并将消息作为响应发送。xmakbtuz2#
如果您使用的是kafka,那么就可以使用分区。
对于每个主题,可以有多个分区。这些分区在使用者之间共享。
例如:
这样,特定组中只有一个使用者将使用消息。