Kafka加入存储

py49o6xq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(402)

我使用kafka连接两个流,连接窗口为3天:

...
 private final long retentionHours = Duration.ofDays(3);

    ...
    var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
                                .grace(Duration.ofMillis(0));
    var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
                                 .withStoreName("STORE-1")
                                 .withName("STORE-2");
    stream1.join(stream2, streamJoiner(), joinWindow, joinStores);

通过上面的实现,我发现kafka创建了state folder:/tmp/kafka streams,(看起来像rocksdb),并且它不断增长。同时,Kafka集群的国有商店也在不断增长。
因此,我将streams join实现更改为:

...
private final long retentionHours = Duration.ofDays(3);
    ...
    var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
                                .grace(Duration.ofMillis(0));
    var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
                                 .withStoreName("STORE-1")
                                 .withName("STORE-2")
                                 .withThisStoreSupplier(createStoreSupplier("MEM-STORE-1"))
                                 .withOtherStoreSupplier(createStoreSupplier("MEM-STORE-2"));
    stream1.join(stream2, streamJoiner(), joinWindow, joinStores);

...
private WindowBytesStoreSupplier createStoreSupplier(String storeName) {
    var window = Duration.ofMinutes(retentionHours * 2)
                         .toMillis();
    return new InMemoryWindowBytesStoreSupplier(storeName, window, window, true);
}

现在,没有状态文件夹:/tmp/kafka streams。
这是否意味着在内存中,Windows存储供应商根本不使用磁盘?如果是,它是如何工作的?
而且,我仍然看到Kafka集群中的状态存储在不断增长。

ccgok5k5

ccgok5k51#

这是否意味着在内存中,Windows存储供应商根本不使用磁盘?如果是,它是如何工作的?
iirc公司, InMemoryWindowBytesStore 根本不用磁盘。
一般来说,逻辑状态存储实际上被划分为多个状态存储“示例”(想想:每个流任务都有自己的本地状态存储示例)。对于 InMemoryWindowBytesStore 具体地说,通过设计,这些存储示例管理内存中的所有本地数据。
而且,我仍然看到Kafka集群中的状态存储在不断增长。
然而 InMemoryWindowBytesStore 仍然是容错的。对于新的kafka流开发人员来说,这常常令人困惑,因为在大多数软件中,“内存中”总是意味着“如果发生了什么事情,数据就会丢失”。然而,Kafka的情况并非如此。状态存储总是持久地“备份”到其kafka changelog主题,而不管您使用的是默认状态存储(使用rocksdb)还是内存中的状态存储。这解释了为什么您会在kafka集群中看到内存状态(changelog)数据。顺便说一句,数据不应该永远增长,因为changelog主题会被压缩以防止出现这种情况。
注意:但是,当使用内存存储时,可能发生的情况是,应用程序示例可能会耗尽内存(oom),从而崩溃。虽然您的状态数据永远不会丢失,如上所述,但由于oom崩溃,您的应用程序将不会运行/它将仅部分运行(一些应用程序示例运行oom,其他示例则不运行oom)。这个oom问题不适用于默认存储(rocksdb),因为它管理磁盘上的数据,并且仅将内存(ram)用于缓存目的。但是,同样,应用程序可用性的问题与数据安全是正交的(无论应用程序是否崩溃,数据都是安全的)。

相关问题