kafka延迟队列实现

siotufzp  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(657)

想要使用高级使用者api实现延迟使用者吗
主旨:
按键生成消息(每个消息包含创建时间戳)这确保每个分区按生成的时间对消息进行排序。
auto.commit.enable=false(将在每个消息进程之后显式提交)
使用消息
检查消息时间戳并检查是否经过了足够的时间
处理消息(此操作永远不会失败)
提交1偏移量

while (it.hasNext()) {
  val msg = it.next().message()
  //checks timestamp in msg to see delay period exceeded
  while (!delayedPeriodPassed(msg)) { 
     waitSomeTime() //Thread.sleep or something....
  }
  //certain that the msg was delayed and can now be handled
  Try { process(msg) } //the msg process will never fail the consumer
  consumer.commitOffsets //commit each msg
}

关于这一实施的一些担忧:
提交每个偏移可能会减慢zk的速度
consumer.commitofsets能否引发异常?如果是,我将使用相同的消息两次(可以用幂等消息求解)
等待很长时间而不提交偏移量的问题,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,处理并提交(zk session timeout?)
zk会话如何在没有提交新偏移量的情况下保持活动(设置一个配置单元zookeeper.session.timeout.ms可以在没有识别的情况下解析死亡消费者)
我还缺什么问题吗?
谢谢!

btqmn9zl

btqmn9zl1#

解决这个问题的一种方法是使用不同的主题,推送所有要延迟的消息。如果所有延迟的消息都应在相同的时间延迟后处理,这将是相当直接的:

while(it.hasNext()) {
    val message = it.next().message()

    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message, delay, delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

现在,所有常规消息都将被尽快处理,而那些需要延迟的消息将被放到另一个主题上。
好在我们知道延迟主题的头部的消息是应该首先处理的消息,因为它的delayto值是最小的。因此,我们可以设置另一个使用者来读取head消息,检查时间戳是否在过去,如果是,则处理消息并提交偏移量。如果没有,则不提交偏移量,而是一直休眠到该时间:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

如果有不同的延迟时间,您可以将主题划分为延迟时间(例如24小时、12小时、6小时)。如果延迟时间比这更动态,它会变得更复杂一些。你可以通过引入两个延迟主题来解决这个问题。从延迟主题中读取所有消息 A 并处理所有 delayTo 价值已经成为过去。在其他人中,你只找到最接近的一个 delayTo 然后把它们放在主题上 B . 睡眠,直到最近的一个应该被处理,并以相反的方式进行,即处理来自主题的消息 B 把不该处理的一次放回主题上 A .
回答您的具体问题(有些问题已在您的问题评论中提及)
提交每个偏移可能会减慢zk的速度
您可以考虑切换到在kafka中存储偏移量(这是0.8.2中提供的功能,请参阅) offsets.storage 消费者配置中的属性)
consumer.commitofsets能否引发异常?如果是,我将使用相同的消息两次(可以用幂等消息求解)
我相信它可以,如果它不能与偏移存储通信的话。如你所说,使用幂等消息解决了这个问题。
等待很长时间而不提交偏移量的问题,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,处理并提交(zk session timeout?)
上述解决方案不会有问题,除非消息本身的处理时间超过会话超时时间。
zk会话如何在没有提交新偏移量的情况下保持活动(设置一个配置单元zookeeper.session.timeout.ms可以在没有识别的情况下解析死亡消费者)
同样,使用上述方法,您不需要设置很长的会话超时。
我还缺什么问题吗?
总是有;)

olhwl3o2

olhwl3o22#

按计划键控列表或其redis替代方案可能是最好的方法。

d8tt03nd

d8tt03nd3#

我建议你换一条路。
在消费者的主线程中处理等待时间是没有意义的。这将是如何使用队列的反模式。从概念上讲,您需要以尽可能快的速度处理消息,并将队列保持在较低的加载因子。
相反,我将使用一个调度程序,它将为需要延迟的每条消息调度作业。通过这种方式,您可以处理队列并创建将在预定义的时间点触发的异步作业。
使用这种技术的缺点是,它可以感知内存中保存调度作业的jvm的状态。如果该jvm失败,那么您就失去了计划的作业,并且不知道该任务是否被执行。
有一些调度器实现,但是可以配置为在集群环境中运行,从而使您免受jvm崩溃的影响。
看看这个java调度框架:http://www.quartz-scheduler.org/

z0qdvdin

z0qdvdin4#

使用tibco ems或其他jms队列。它们内置了重试延迟。Kafka可能不是你所做的正确的设计选择

相关问题