kafka grouptable测试在使用processortopologytestdriver时生成额外的消息

w1jd8yoj  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(351)

我已经编写了一个流,它接收消息并发送一个已经出现的键表。如果有什么东西出现,它将显示1的计数。这是我的生产代码的简化版本,以演示这个bug。在实时运行中,每个接收到的消息都会发送一条消息。
然而,当我使用processortopologytestdriver在单元测试中运行它时,我得到了不同的行为。如果收到之前已经看到的密钥,我会收到一条额外的消息。
如果我用键“key1”,然后是“key2”,然后是“key1”发送消息,我会得到以下输出。

key1 - 1
key2 - 1
key1 - 0
key1 - 1

出于某种原因,它会先递减值,然后再将其加回去。这仅在使用processortopologytestdriver时发生。这是预期的吗?附近有工作吗?或者这是虫子?
以下是我的拓扑结构:

final StreamsBuilder builder = new StreamsBuilder();
    KGroupedTable<String, String> groupedTable
            = builder.table(applicationConfig.sourceTopic(), Consumed.with(Serdes.String(), Serdes.String()))
            .groupBy((key, value) -> KeyValue.pair(key, value), Serialized.with(Serdes.String(), Serdes.String()));

    KTable<String, Long> countTable = groupedTable.count();

    KStream<String, Long> countTableAsStream = countTable.toStream();
    countTableAsStream.to(applicationConfig.outputTopic(), Produced.with(Serdes.String(), Serdes.Long()));

下面是我的单元测试代码:

TopologyWithGroupedTable top = new TopologyWithGroupedTable(appConfig, map);
    Topology topology = top.get();
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, topology);
    driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());
    driver.process(inputTopic, "key2", "theval", Serdes.String().serializer(), Serdes.String().serializer());
    driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());

    ProducerRecord<String, Long> outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key1", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value());
    outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key2", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value());
    outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key1", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value()); //this fails, I get 0.  If I pull another message, it shows key1 with a count of 1

以下是完整代码的回购:
https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/
流拓扑:https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/main/java/com/nick/kstreams/topologywithgroupedtable.java
测试代码:https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/test/java/com/nick/kstreams/topologywithgroupedtabletests.java

wnavrhmk

wnavrhmk1#

这不是一个bug,而是设计的行为(c.f.解释如下)。
行为上的差异是由于 KTable 状态存储缓存(参见。https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html). 在运行单元测试时,缓存会在每条记录之后刷新,而在生产运行中,情况并非如此。如果在生产运行中禁用缓存,我假设它的行为与单元测试中相同。
旁注: ProcessorTopologyTestDriver 是一个内部类,不是公共api的一部分。因此,没有兼容性保证。您应该改用官方的单元测试包:https://docs.confluent.io/current/streams/developer-guide/test-streams.html
为什么会看到两个记录:
在代码中,您使用的是 KTable#groupBy() 在您的特定用例中,您不需要更改密钥。但是,通常情况下,键可能会更改(取决于输入的值) KTable . 因此,如果输入 KTable 如果发生变化,则下游聚合需要从聚合结果中删除/减去旧的键值对,然后将新的键值对添加到聚合结果中。一般来说,新的键值对和旧的键值对是不同的,因此,它需要生成两个记录,因为减法和加法可能发生在不同的示例上,因为不同的键可能被不同的散列。这有道理吗?
因此,对于输入的每次更新 KTable ,两次更新两次结果 KTable 通常需要计算两个不同的键值对。对于键不改变的特定情况,kafka stream会做相同的事情(如果键实际上是相同的,则没有检查/优化此情况以将两个操作“合并”为一个)。

相关问题