使用kstream连接的kafka拓扑的java单元测试

ibps3vxo  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(335)

我有一个拓扑结构,它有两个kstream连接,我面临的问题是,当topologytestdriver用pipeinput发送几个consumerrecords,然后readoutput进行单元测试时。好像没用。
我想这可能是因为连接在实际的kafka中使用了内部rocksdb,而我们在测试中没有使用它。
所以我一直在寻找解决这个问题的办法,但找不到。
注意:当删除kstream kstream连接时,这种测试方法可以很好地工作。

kgsdhlau

kgsdhlau1#

我有一个拓扑结构,它有两个kstream连接,我面临的问题是,当topologytestdriver用pipeinput发送几个consumerrecords,然后readoutput进行单元测试时。好像没用。
但不幸的是,在你的情况下 TopologyTestDriver 不是kafka streams引擎在运行时如何工作的100%精确模型。值得注意的是,新传入事件的处理顺序存在一些差异。
这确实会在尝试测试(例如)某些联接时引起问题,因为这些操作依赖于特定的处理顺序(例如,在流表联接中,表应该在'alice'的流端事件到达之前已经有了键'alice'的条目,否则流端'alice'的连接输出将不包括任何表端数据)。
所以我一直在寻找解决这个问题的办法,但找不到。
我建议使用测试来启动一个嵌入式kafka集群,然后使用“真正的”kafka流引擎(即,不是 TopologyTestDriver ). 实际上,这意味着您正在将您的测试从单元测试更改为集成/系统测试:您的测试将启动一个完整的kafka流拓扑,该拓扑与运行在与您的测试相同的机器上的嵌入式kafka集群进行通信。
请参阅apachekafka项目中的kafka流集成测试,其中 EmbeddedKafkaCluster 以及 IntegrationTestUtils 是工具的中心件。连接的具体测试示例如下 StreamTableJoinIntegrationTest (有一些与连接相关的集成测试)及其父级 AbstractJoinIntegrationTest . (值得一提的是,这里有更多的集成测试示例https://github.com/confluentinc/kafka-streams-examples#examples-集成测试,其中包括在使用apacheavro作为数据格式时还涉及合流模式注册表的测试,等等)
但是,除非我弄错了,否则集成测试及其工具不包括在kafka streams的testutilities工件中(即。, org.apache.kafka:kafka-streams-test-utils ). 所以你必须复制粘贴到你自己的代码库中。

u2nhd7ah

u2nhd7ah2#

你看过Kafka流单元测试吗?它是关于管道数据和检查最终结果与模拟处理器。
例如,对于以下流连接:

stream1 = builder.stream(topic1, consumed);
        stream2 = builder.stream(topic2, consumed);
        joined = stream1.outerJoin(
            stream2,
            MockValueJoiner.TOSTRING_JOINER,
            JoinWindows.of(ofMillis(100)),
            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
        joined.process(supplier);

然后,您可以开始将输入项管道化到第一个或第二个主题中,并在每个连续的输入管道中检查处理器可以检查的内容:

// push two items to the primary stream; the other window is empty
            // w1 = {}
            // w2 = {}
            // --> w1 = { 0:A0, 1:A1 }
            //     w2 = {}
            for (int i = 0; i < 2; i++) {
                inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
            }
            processor.checkAndClearProcessResult(EMPTY);

            // push two items to the other stream; this should produce two items
            // w1 = { 0:A0, 1:A1 }
            // w2 = {}
            // --> w1 = { 0:A0, 1:A1 }
            //     w2 = { 0:a0, 1:a1 }
            for (int i = 0; i < 2; i++) {
                inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
                new KeyValueTimestamp<>(1, "A1+a1", 0));

我希望这有帮助。
参考文献:[1]https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/kstreamkstreamjointest.java#l279

相关问题