了解Kafka中max.in.flight.request属性的用例

mzsu5hc0  于 2023-02-03  发布在  Apache
关注(0)|答案(2)|浏览(214)

我正在构建一个Sping Boot 消费者-生产者项目,Kafka是两个微服务之间的中间人。这个项目的主题是一场篮球比赛。这是一个小的状态机图,其中显示了事件。还会有更多不同的事件,这只是一个片段。

开始事件:

{
  "id" : 5,
  "actualStartTime" : "someStartTime"
}

点事件:

{
   "game": 5,
   "type": "POINT",
    "payload": {
          "playerId": 44,
          "value": 3
    }
}

辅助事件:

{
  "game": 4,
  "type": "ASSIST",
  "payload": {
    "playerId": 278,
    "value": 1
  }
}

跳转事件:

{
   "game": 2,
   "type": "JUMP",
   "payload": {
     "playerId": 55,
     "value": 1
   }
 }

结束事件:

{
    "id" : 5,
    "endTime" : "someStartTime"
}

这里要注意的主要事情是,如果有一个辅助事件,它必须跟在点事件之后。
由于我是Kafka的新手,我将保持事情的简单性,并有一个代理,一个主题和一个分区。对于我的用例,我需要保持这些事件中的每一个的顺序,因为它们实际上发生在球场上(我有一个7000行的json文件,以及大量的这些和其他事件)。
因此,假设有人从Admin UI向Producer应用发送这些事件(例如通过WebSockets)。Producer应用将执行一些简单的验证或任何需要执行的操作。现在,我们还可以想象我们有两个Producer应用示例,一个位于ip:8080(prd 1),另一个位于ip:8081(prd 2)。
在现实中,这三个事件发生的顺序:协助-〉点-〉跳跃。球场上的操作员按这个顺序发送这三个事件。
Assist事件在prd 1上发送,Point在prd 2上发送。现在假设prd 1和Kafka群集之间的通信出现网络故障。由于在撰写本文时我们使用的是Kafka最新版本,因此我们已经有了enabled.idempotence=true,Assist事件不会发送两次。
在prd 1(朝向Kafka)上重试辅助事件期间,prd 2上的点事件成功通过。然后辅助事件通过,之后跳转事件(在任何生成器处)也在Kafka中结束。
现在,我们的队列中有:点-〉辅助-〉跳跃。这是不允许的。
我的问题是,这些类型的问题是否应该由应用程序的业务逻辑(例如Spring State Machine)来处理,或者这种排序可以由Kafka来处理?
在后者的情况下,属性max.in.flight.request=1负责排序吗?是否有其他属性可以保持排序?
顺便说一句,对于单个比赛使用单个分区,对于任何分区使用多个消费者,这是一个好的策略吗?最有可能的是,我将流式传输不同类型的比赛(篮球、足球、高尔夫,跨越不同的联赛和国家),其中大多数都需要某种排序。
这也许可以用KStreams来完成,但我仍然处于Kafka陡峭的学习曲线上。
更新1(Jessica Vasey评论后):
嗨,谢谢你的评论。不幸的是,我没有得到所有的拼图。最让我困惑的是你使用的一些术语和事情发生的顺序。不是说它不正确,只是我不明白。
我将有两个微服务,所以两个生产者。我必须能够理解Kafka在微服务的世界,因为我是Java Spring开发人员和它的所有关于微服务和多个示例。
假设在prd 1上出现了一些dto事件[Start -〉Point -〉Assist],它们被作为ProducerRequest(https://kafka.apache.org/documentation/#recordbatch)发送,它们被放置在RECORDS字段中。在prd 2上,我们得到的[Point -〉Jump]也是ProducerRequest。在我的理解中,它们是两个独立的动态请求(可能有5个?)?它们的顺序基于时间戳?
所以当加入集群时,Kafka将id分配给生产者,假设prd 1为“0”,prd 2为“1”(我猜这也取决于它们被分配的主题分区)。我不明白是每个RecordBatch都有其单调递增的序列号id,还是RecordBatch中的每个Kafka消息都有其单调递增的序列号,或者两者都有?还有"time to recover“部分比如,如果我得到OutofOrderSequenceException,这是否意味着[Point -〉Jump]批处理(可能还有其他正在运行的请求和生产者缓冲区中的其他批处理)将一直驻留在Kafka上,直到delivery.timeout.ms过期或最终成功发送[Start -〉Point -〉Assist]?

iaqfqrcu

iaqfqrcu1#

抱歉让你更困惑了,你的逻辑很复杂!希望我能为你澄清一些问题。我以为你只有一个制作人,但是在重新阅读你的帖子后,我发现你有两个制作人。
你不能保证消息在两个生产者之间的顺序。你只能保证每个生产者的顺序。这篇文章很好地解释了这一点Kafka ordering with multiple producers on same topic and parititon
关于这个问题:
在我的理解中,它们是两个独立的飞行中请求(5个可能的请求中?)?它们的排序是基于时间戳的?
是的,每个生产者将每个连接的最大飞行请求数设置为5.
你可以在你的制作器中提供一个timestamp,这可能会对你的情况有所帮助,但是,我现在不想在这方面做太多的细节介绍,我会先回答你的问题。
我不明白是每个RecordBatch都有其单调递增的序列号ID,还是RecordBatch中的每个Kafka消息都有其单调递增的序列号,或者两者都有?还有“恢复时间"部分也让我很困扰。比如,如果我遇到OutofOrderSequenceException,这是否意味着[Point -〉Jump]批处理(可能还有其他运行中的请求和生产者缓冲区中的其他批处理)将一直停留在Kafka上,直到www. example www.example.com 过期或最终成功发送[开始-〉点-〉辅助]时?
每个消息都被分配了一个单调递增的序列号。这个LinkedIn帖子解释的比我以前能解释的要好!
是的,其他批次将保留在生产商上,直到前一批次得到确认(可能不到2分钟)或delivery.timeout.ms过期。

yuvru6vn

yuvru6vn2#

即使是max.in.flight.requests.per.connection > 1,设置enable.idempotence=true也应该保留消息顺序,因为这会为消息分配一个序列号。当一个批处理失败时,同一分区的所有后续批处理都会失败,并显示OutofOrderSequenceException
分区的数量应该由你的目标吞吐量决定。如果你想把篮球比赛发送到一个分区,把高尔夫球发送到另一个分区,你可以使用键来决定哪个消息应该被发送到哪里。

相关问题