scala—使用React式kafka有条件地处理消息

vql8enpb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(576)

我一直在尝试使用React式Kafka,我有一个条件处理的问题,我没有找到一个令人满意的答案。
基本上,我正在尝试使用一个Kafka主题,其中包含大量的消息(每天大约100亿条消息),并且仅根据消息的某些属性处理其中的几条消息(每天几千条),然后将已处理版本的消息推送到另一个主题,我正在努力正确地做到这一点。
我的第一次尝试是:

// This is pseudo code.
Source(ProducerSettings(...))
    .filter(isProcessable(_))
    .map(process(_))
    .via(Producer.flow(producerSettings))
    .map(_.commitScalaDsl())
    .runWith(Sink.ignore)

这种方法的问题是,我只在读取我能够处理的消息时才提交,这显然是不酷的,因为如果我必须停止并重新启动程序,那么我必须重新读取一堆无用的消息,而且因为它们太多了,我不能这样做。
然后,我尝试使用graphdsl,方法是围绕以下行:

in ~> broadcast ~> isProcessable    ~> process ~> producer ~> merge ~> commit
   ~> broadcast ~>              isNotProcessable           ~> merge

这个解决方案显然也不好,因为我无法处理的消息会经过图的第二个分支,并在可处理的消息被真正推送到目的地之前提交,这比第一个消息更糟糕,因为它甚至不能保证至少一次传递。
有人知道我怎样才能解决这个问题吗?

2mbi3lxu

2mbi3lxu1#

我以前解决类似问题的一种方法是利用序列号来保证排序。
例如,您可以构建一个流,如您描述的保存提交:

in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> out
   ~> broadcast ~>            isNotProcessable          ~> merge

然后把它 Package 成这样一个订单保存流(取自我们公司开发的一个库):orderpreservingflow。然后,产生的流可以被发送到提交器接收器。
如果您的处理阶段保证了排序,那么您甚至可以通过将逻辑直接嵌入到图形中来提高效率并避免任何缓冲:

in ~> injectSeqNr ~> broadcast ~> isProcessable ~> process ~> producer ~> mergeNextSeqNr ~> commit
                  ~> broadcast ~>             isNotProcessable         ~> mergeNextSeqNr

在这里,mergenextseqnr只是一个修改过的合并阶段,如果端口1上有可用的输入,那么如果它的序列号是预期的序列号,您将立即发出它,否则您只需等待另一个端口上有可用的数据。
最终结果应该与使用上面的流 Package 完全相同,但是如果嵌入它,您可能会更容易地使其适应您的需要。

相关问题