Kafka 示例化状态存储目录中缺少数据

tnkciper  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(98)

我的应用程序对 GroupedKTable 进行聚合,然后将其具体化为 PersistentKeyValueStore
我将 state.dir 配置为永久路径(不是默认的 /tmp/kstreams
我可以看到存储为changelog-topic中的事件的键值,而且我的应用程序也成功地从状态存储中获取它们。
但是当我去检查 state.dir 目录时,我发现那里只有元数据。没有数据(特别是 *.sst文件 *)(即使我指定了Persistent存储)
为什么会这样呢?我在哪里可以找到数据?在我看来,这是一个InMemoryStore的行为,
物化的看起来像:

val StoreSupplier = Stores.persistentTimestampedKeyValueStore(name)
Materialized.as(StoreSupplier)(KeySerde, ValueSerde)

然后我把它传递给聚合体。
注意:在本地机器上,使用TopologyTestDriver,我可以看到 .sst 文件。我的问题适用于应用程序的真实的部署

jjjwad0x

jjjwad0x1#

这实际上是一个RocksDB细节。
当kafka-streams将数据刷新到RocksDB时(无论是 * commit.interval.ms * 过期,还是该高速缓存已满),RocksDB都会将数据存储在 active memtable 和transaction log(在kafka-streams中,transaction log被配置为changelog-topic)中。
只有当memtable已满时(在kafka-streams中配置为16 MB),memtable才变为只读,此时可以将其写入 .sst 文件
只要store的大小小于memtable的大小,就无法在 state.dir 中看到 .sst 文件
我觉得下面这篇文章对了解RocksDB细节很有意思:https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/

相关问题