测试Kafka Streams中的连接

aij0ehis  于 2023-03-07  发布在  Apache
关注(0)|答案(1)|浏览(109)

我有一个关于测试kafak流的问题,特别是测试Kafka流内部的连接。我有一个关于雇员的小用例,我想将其与保险连接。输出将是一个具有保险名称的丰富雇员。引用将是雇员中的insuranceIdx。
下面是我的用例拓扑:

public class EnrichEmployeesTopoplogy {

    public void createTopology(final StreamsBuilder builder) {

        final Serde<EnrichedEmployee> enrichedEmployeeSerde = SerdeFactory.serdeForEnrichedEmployee();
        final Serde<Insurance> insuranceSerde = SerdeFactory.serdeForInsurance();
        final Serde<Employee> employeeSerde = SerdeFactory.serdeForEmployee();

        // read the incomming employee events in a stream
        final KStream<String, Employee> employeeStream =
                builder.stream("employees", Consumed.with(Serdes.String(), employeeSerde));

        // rekey the incomming employee stream that it will have the same key as the insurance.
        final KStream<String, Employee> rekeyedEmployeeStream
                = employeeStream.selectKey((k, v) -> String.valueOf(v.getInsuranceIdx()));

        // read the incomming insurance events in a table
        final KTable<String, Insurance> insuranceTable = builder.table("insurances",
                Consumed.with(Serdes.String(), insuranceSerde));

        // create the value joiner
        ValueJoiner<Employee, Insurance, EnrichedEmployee> employeeJoiner =
                (employee, insurance) ->
                        EnrichedEmployee.builder()
                                .idx(employee.getIdx())
                                .email(employee.getEmail())
                                .insuranceName(insurance.getName())
                                .build();

        // create the joined with 
        final Joined<String, Employee, Insurance> joined = Joined.with(Serdes.String(), employeeSerde, insuranceSerde);

        rekeyedEmployeeStream.join(insuranceTable, employeeJoiner, joined)
                .to("enriched-employees", Produced.with(Serdes.String(), enrichedEmployeeSerde));

    }
}

当我用一个真实的的Kafka测试这个行为时,它会正常工作。现在我已经用kafka-streams-test-utils编写了一个单元测试
下面是我的测试代码:

class EnrichedEmployeeTopologyTest {

    private TopologyTestDriver testDriver;
    private TestInputTopic<String, Employee> inputTopicEmployees;
    private TestInputTopic<String, Insurance> inputTopicInsurances;
    private TestOutputTopic<String, EnrichedEmployee> outputTopicEnrichedEmployees;

    @BeforeEach
    void setUp() {

        final Serde<EnrichedEmployee> enrichedEmployeeSerde = SerdeFactory.serdeForEnrichedEmployee();
        final Serde<Insurance> insuranceSerde = SerdeFactory.serdeForInsurance();
        final Serde<Employee> employeeSerde = SerdeFactory.serdeForEmployee();

        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test01");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");

        final EnrichEmployeesTopoplogy enrichEmployeesTopoplogy = new EnrichEmployeesTopoplogy();
        final StreamsBuilder builder = new StreamsBuilder();

        enrichEmployeesTopoplogy.createTopology(builder);

        final Topology topology = builder.build();

        testDriver = new TopologyTestDriver(topology, streamsConfiguration);
        inputTopicEmployees = testDriver.createInputTopic("employees", Serdes.String().serializer(), employeeSerde.serializer());
        inputTopicInsurances = testDriver.createInputTopic("insurances", Serdes.String().serializer(), insuranceSerde.serializer());
        outputTopicEnrichedEmployees = testDriver.createOutputTopic("enriched-employees", Serdes.String().deserializer(), enrichedEmployeeSerde.deserializer());

    }

    @AfterEach
    void tearDown() {
        testDriver.close();
    }

    @Test
    void createTopology() {
        final String employeeIdx = UUID.randomUUID().toString();
        final String insuranceIdx = UUID.randomUUID().toString();

        final Employee employee = Employee.builder().idx(employeeIdx).email("foo1@bar.de").insuranceIdx(insuranceIdx).build();
        final Insurance insurance = Insurance.builder().idx(insuranceIdx).name("insurance 01").build();

        inputTopicEmployees.pipeInput(employee.getIdx(), employee);
        inputTopicInsurances.pipeInput(insurance.getIdx(), insurance);

        Assertions.assertEquals(true, !outputTopicEnrichedEmployees.isEmpty());
    }
}

结果将是Assert总是失败。
我试着用流配置值做了一些实验。但是什么都不会改变。我的假设是这可能是测试中的一个计时问题。有人能帮我解决这个问题吗?
最佳问候托马斯

4c8rllxm

4c8rllxm1#

在Kafka流-流表连接只触发新的传入KStream记录;传入的KTable记录只会更新KTable。2在你的测试中,你能不能切换调用的顺序,先执行inputTopicInsurances.pipeInput,然后执行inputTopicEmployees.pipeInput方法,看看是否有效?

相关问题