我有5Kafka主题,其中100 msgs将流动每秒。消息格式如下{null,json}(分区:10)我需要根据json中的值和java应用程序中的进程从过去提取一条记录。正确的方法是什么?ksqldb流Kafka河K表提前谢谢。
o75abkj41#
首先,想想你想在你的应用程序中实现什么。如果您想要数据库功能,比如复杂连接、使用存储过程等等,那么kafka就不是您的选择(即使kafka可以进行复杂连接,但是您可以编写更复杂的代码)。你的情景似乎适合 Kafka Streams 拓扑,其中可以使用物化 KTable (事实上是 Store )这将允许您通过分区键查找数据。性能将与数据库顶部相同,因为 Store 它具体化为 RocksDB 数据库。请记住,要做到这一点,您将面临在进行分区时常见的所有问题(首先,为所有数据项找到一个公共分区键)。你可以找到更多关于 Kafka Streams 在这里工作:https://docs.confluent.io/current/streams/architecture.html 关于 Kafka Store : https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/stores.html.
Kafka Streams
KTable
Store
RocksDB
Kafka Store
nqwrtyyt2#
虽然可以把Kafka当作db,但我强烈建议你不要采用这种方法。kakfa是一个消息代理,因此如果您想查看过去的消息,就必须重新处理整个主题唯一可以“优雅地”处理这种情况的场景是,如果您知道消息的偏移量,那么您可以设置您的消费者直接前往那里,但是由于您描述的用例,我认为您没有它我没有和ksqldb一起工作,但它看起来像是普通的kqslksql只是kafka流之上的一个抽象,就像kafka流是消费生产者之上的一个抽象一样。每个抽象都有较少的功能你可以使用任何你想要的方法来实现你的目标,注意有几个因素会让你做出选择,例如ksql是最简单的方法,但是您需要一个ksql服务器(afaik)ksql不适用于所有序列化格式当您有etl场景(从kafka读取、处理消息、发送回kafka)时,kafka流最适合如果你需要完全控制流量,正规的消费品生产商会给你
2条答案
按热度按时间o75abkj41#
首先,想想你想在你的应用程序中实现什么。如果您想要数据库功能,比如复杂连接、使用存储过程等等,那么kafka就不是您的选择(即使kafka可以进行复杂连接,但是您可以编写更复杂的代码)。
你的情景似乎适合
Kafka Streams
拓扑,其中可以使用物化KTable
(事实上是Store
)这将允许您通过分区键查找数据。性能将与数据库顶部相同,因为Store
它具体化为RocksDB
数据库。请记住,要做到这一点,您将面临在进行分区时常见的所有问题(首先,为所有数据项找到一个公共分区键)。
你可以找到更多关于
Kafka Streams
在这里工作:https://docs.confluent.io/current/streams/architecture.html 关于Kafka Store
: https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/stores.html.nqwrtyyt2#
虽然可以把Kafka当作db,但我强烈建议你不要采用这种方法。kakfa是一个消息代理,因此如果您想查看过去的消息,就必须重新处理整个主题
唯一可以“优雅地”处理这种情况的场景是,如果您知道消息的偏移量,那么您可以设置您的消费者直接前往那里,但是由于您描述的用例,我认为您没有它
我没有和ksqldb一起工作,但它看起来像是普通的kqsl
ksql只是kafka流之上的一个抽象,就像kafka流是消费生产者之上的一个抽象一样。每个抽象都有较少的功能
你可以使用任何你想要的方法来实现你的目标,注意有几个因素会让你做出选择,例如
ksql是最简单的方法,但是您需要一个ksql服务器
(afaik)ksql不适用于所有序列化格式
当您有etl场景(从kafka读取、处理消息、发送回kafka)时,kafka流最适合
如果你需要完全控制流量,正规的消费品生产商会给你