我使用kafka连接两个流,连接窗口为3天:
...
private final long retentionHours = Duration.ofDays(3);
...
var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
.grace(Duration.ofMillis(0));
var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
.withStoreName("STORE-1")
.withName("STORE-2");
stream1.join(stream2, streamJoiner(), joinWindow, joinStores);
通过上面的实现,我发现kafka创建了state folder:/tmp/kafka streams,(看起来像rocksdb),并且它不断增长。同时,Kafka集群的国有商店也在不断增长。
因此,我将streams join实现更改为:
...
private final long retentionHours = Duration.ofDays(3);
...
var joinWindow = JoinWindows.of(Duration.ofMinutes(retentionHours))
.grace(Duration.ofMillis(0));
var joinStores = StreamJoined.with(Serdes.String(), aggregatorSerde, aggregatorSerde)
.withStoreName("STORE-1")
.withName("STORE-2")
.withThisStoreSupplier(createStoreSupplier("MEM-STORE-1"))
.withOtherStoreSupplier(createStoreSupplier("MEM-STORE-2"));
stream1.join(stream2, streamJoiner(), joinWindow, joinStores);
...
private WindowBytesStoreSupplier createStoreSupplier(String storeName) {
var window = Duration.ofMinutes(retentionHours * 2)
.toMillis();
return new InMemoryWindowBytesStoreSupplier(storeName, window, window, true);
}
现在,没有状态文件夹:/tmp/kafka streams。
这是否意味着在内存中,Windows存储供应商根本不使用磁盘?如果是,它是如何工作的?
而且,我仍然看到Kafka集群中的状态存储在不断增长。
1条答案
按热度按时间ccgok5k51#
这是否意味着在内存中,Windows存储供应商根本不使用磁盘?如果是,它是如何工作的?
iirc公司,
InMemoryWindowBytesStore
根本不用磁盘。一般来说,逻辑状态存储实际上被划分为多个状态存储“示例”(想想:每个流任务都有自己的本地状态存储示例)。对于
InMemoryWindowBytesStore
具体地说,通过设计,这些存储示例管理内存中的所有本地数据。而且,我仍然看到Kafka集群中的状态存储在不断增长。
然而
InMemoryWindowBytesStore
仍然是容错的。对于新的kafka流开发人员来说,这常常令人困惑,因为在大多数软件中,“内存中”总是意味着“如果发生了什么事情,数据就会丢失”。然而,Kafka的情况并非如此。状态存储总是持久地“备份”到其kafka changelog主题,而不管您使用的是默认状态存储(使用rocksdb)还是内存中的状态存储。这解释了为什么您会在kafka集群中看到内存状态(changelog)数据。顺便说一句,数据不应该永远增长,因为changelog主题会被压缩以防止出现这种情况。注意:但是,当使用内存存储时,可能发生的情况是,应用程序示例可能会耗尽内存(oom),从而崩溃。虽然您的状态数据永远不会丢失,如上所述,但由于oom崩溃,您的应用程序将不会运行/它将仅部分运行(一些应用程序示例运行oom,其他示例则不运行oom)。这个oom问题不适用于默认存储(rocksdb),因为它管理磁盘上的数据,并且仅将内存(ram)用于缓存目的。但是,同样,应用程序可用性的问题与数据安全是正交的(无论应用程序是否崩溃,数据都是安全的)。