如何使用来自kafka主题的特定消息

jecbmhm3  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(393)

我们正在使用kafka和elk构建一个应用程序。我们将消息存储到Kafka主题中。logstash然后从主题中读取数据,并将“仅特定”记录编入elasticsearch。比如,如果一个字段有某个值,那么索引它,否则不要索引它,让它留在主题中。这是目前为止的设计。但是在将来,我们将需要处理那些没有被索引到elasticsearch中的来自主题的消息。有没有办法以后只处理Kafka发来的那些没有编入索引的信息?如果是,怎么做?我们需要将它们存储在不同的主题中吗?或者我们可以使用相同的主题和相同的使用者组,但在存储在主题中时为它们分配不同的分区id。
有人能解释一下吗。
谢谢!

vngu2lb8

vngu2lb81#

您可以将已通过ElasticSearch读取和处理的偏移量范围保存在某个位置(在外部文件或弹性主题本身中),然后仅使用特定的范围间隔在第二时间内处理数据。
读取由特定偏移量分隔的kafka队列子集的一种简单方法是使用带有kafkardd的spark应用程序:https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/streaming/kafka/kafkardd.html

cigdeys3

cigdeys32#

只要你把数据保存在Kafka中(即主题保留期),你就可以随时阅读同一个主题。
因此,如果您想处理在elasticsearch中没有索引的数据,您可以简单地重新阅读主题并应用反向过滤器来获取所有没有在elasticsearch中结束的消息。为此,我建议使用不同的消费者组id。
当然,您也可以在将主题加载到elasticsearch时将未加载到elasticsearch的消息写入新主题(即,在一次传递日期时将其放入elasticsearch和新主题)。但是比你存储一些数据两次。因此,您在以后处理非索引数据时有一个空间/时间权衡:假设90%的数据最终都在elasticsearch中,那么复制10%的数据并在以后加速处理这些数据可能是值得的(您只需要阅读小10倍的新主题)。如果只有10%的数据被索引,复制90%的数据似乎是一种浪费,只节省10%的读取开销。

相关问题