在kubernetes部署中的任何一个pod上运行进程

gmol1639  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(337)

我有一个应用程序,它可以在多个pod上运行,并在流量增加时扩展。该应用程序的一个特点是“它从Kafka那里挑选信息并触发电子邮件”。
当多个pod运行时,所有pod都会触发电子邮件,因为所有pod都应该按照设计接收消息。
如何限制电子邮件功能一次在任何一个pod上工作?
集群-eks,编程语言-scala akka

hi3rlvi2

hi3rlvi21#

如何限制电子邮件功能一次在任何一个pod上工作?
简言之:对所有触发电子邮件的pod使用相同的消费群。通常,工作负载根据其所做的工作分为若干组。同一组的成员彼此分担工作量。
你当然可以给Kafka这样的消费者配置 bootstrap.servers 等你的豆荚。在该配置中,提供一个名为 group.id 一些价值,比如 email-trigger-group 例如,然后工作负载将按照您的预期进行共享。
你可以在你的豆荚上使用标签来触发电子邮件。您可以为您的消费者使用相同的标签值 group.id 为了你所有的豆荚。
我们可以把这个问题分成两个子问题:
1触发电子邮件
此工作负载可以由组中的多个使用者共享。
2回答前端的请求
使用说明书 consumer.assign() 对于整个主题(所有分区)。
前端将指定它希望从中获得新消息的时间戳,即带有时间戳的消息 > 将从主题的所有分区检索此时间戳。使用 consumer.offsetsForTimes() 要获取时间戳,轮询并将消息作为响应发送。

List<TopicPartition> topicPartitions = consumer.partitionsFor("your_topic").stream().map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()).toList();
consumer.assign(topicPartitions);

// Populate the map
Map<TopicPartition, Long> partitionTimestamp = new LinkedHashMap<>();

// Add the same timestamp received from frontend for all partitions
topicPartitions.forEach(topicPartition -> partitionTimestamp.put(topicPartition, timestampFromFrontend));

// Get the offsets and seek
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(offsetsForTimestamp);

// Seek to the offsets
offsetsForTimes.forEach( (tp, oft) -> consumer.seek(tp, oft.offset()) );

// Poll and return
consumer.poll();
xmakbtuz

xmakbtuz2#

如果您使用的是kafka,那么就可以使用分区。
对于每个主题,可以有多个分区。这些分区在使用者之间共享。
例如:

Email Topic: Partitions[0,1,2,3,4,5]

Email Consumer Group:
   Consumer 1: Partitions[0,3]
   Consumer 2: Partitions[1,4]
   Consumer 3: Partitions[2,5]

On Scale Up Event:
   Consumer 1: Partitions[0]
   Consumer 2: Partitions[1]
   Consumer 3: Partitions[2,5]
   Consumer 4: Partitions[3]
   Consumer 5: Partitions[4]

On Scale Down Event:
   Consumer 1: Partitions[0,2,4]
   Consumer 2: Partitions[1,3,5]

这样,特定组中只有一个使用者将使用消息。

相关问题