我有一个关于测试kafak流的问题,特别是测试Kafka流内部的连接。我有一个关于雇员的小用例,我想将其与保险连接。输出将是一个具有保险名称的丰富雇员。引用将是雇员中的insuranceIdx。
下面是我的用例拓扑:
public class EnrichEmployeesTopoplogy {
public void createTopology(final StreamsBuilder builder) {
final Serde<EnrichedEmployee> enrichedEmployeeSerde = SerdeFactory.serdeForEnrichedEmployee();
final Serde<Insurance> insuranceSerde = SerdeFactory.serdeForInsurance();
final Serde<Employee> employeeSerde = SerdeFactory.serdeForEmployee();
// read the incomming employee events in a stream
final KStream<String, Employee> employeeStream =
builder.stream("employees", Consumed.with(Serdes.String(), employeeSerde));
// rekey the incomming employee stream that it will have the same key as the insurance.
final KStream<String, Employee> rekeyedEmployeeStream
= employeeStream.selectKey((k, v) -> String.valueOf(v.getInsuranceIdx()));
// read the incomming insurance events in a table
final KTable<String, Insurance> insuranceTable = builder.table("insurances",
Consumed.with(Serdes.String(), insuranceSerde));
// create the value joiner
ValueJoiner<Employee, Insurance, EnrichedEmployee> employeeJoiner =
(employee, insurance) ->
EnrichedEmployee.builder()
.idx(employee.getIdx())
.email(employee.getEmail())
.insuranceName(insurance.getName())
.build();
// create the joined with
final Joined<String, Employee, Insurance> joined = Joined.with(Serdes.String(), employeeSerde, insuranceSerde);
rekeyedEmployeeStream.join(insuranceTable, employeeJoiner, joined)
.to("enriched-employees", Produced.with(Serdes.String(), enrichedEmployeeSerde));
}
}
当我用一个真实的的Kafka测试这个行为时,它会正常工作。现在我已经用kafka-streams-test-utils编写了一个单元测试
下面是我的测试代码:
class EnrichedEmployeeTopologyTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Employee> inputTopicEmployees;
private TestInputTopic<String, Insurance> inputTopicInsurances;
private TestOutputTopic<String, EnrichedEmployee> outputTopicEnrichedEmployees;
@BeforeEach
void setUp() {
final Serde<EnrichedEmployee> enrichedEmployeeSerde = SerdeFactory.serdeForEnrichedEmployee();
final Serde<Insurance> insuranceSerde = SerdeFactory.serdeForInsurance();
final Serde<Employee> employeeSerde = SerdeFactory.serdeForEmployee();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test01");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
final EnrichEmployeesTopoplogy enrichEmployeesTopoplogy = new EnrichEmployeesTopoplogy();
final StreamsBuilder builder = new StreamsBuilder();
enrichEmployeesTopoplogy.createTopology(builder);
final Topology topology = builder.build();
testDriver = new TopologyTestDriver(topology, streamsConfiguration);
inputTopicEmployees = testDriver.createInputTopic("employees", Serdes.String().serializer(), employeeSerde.serializer());
inputTopicInsurances = testDriver.createInputTopic("insurances", Serdes.String().serializer(), insuranceSerde.serializer());
outputTopicEnrichedEmployees = testDriver.createOutputTopic("enriched-employees", Serdes.String().deserializer(), enrichedEmployeeSerde.deserializer());
}
@AfterEach
void tearDown() {
testDriver.close();
}
@Test
void createTopology() {
final String employeeIdx = UUID.randomUUID().toString();
final String insuranceIdx = UUID.randomUUID().toString();
final Employee employee = Employee.builder().idx(employeeIdx).email("foo1@bar.de").insuranceIdx(insuranceIdx).build();
final Insurance insurance = Insurance.builder().idx(insuranceIdx).name("insurance 01").build();
inputTopicEmployees.pipeInput(employee.getIdx(), employee);
inputTopicInsurances.pipeInput(insurance.getIdx(), insurance);
Assertions.assertEquals(true, !outputTopicEnrichedEmployees.isEmpty());
}
}
结果将是Assert总是失败。
我试着用流配置值做了一些实验。但是什么都不会改变。我的假设是这可能是测试中的一个计时问题。有人能帮我解决这个问题吗?
最佳问候托马斯
1条答案
按热度按时间4c8rllxm1#
在Kafka流-流表连接只触发新的传入KStream记录;传入的KTable记录只会更新KTable。2在你的测试中,你能不能切换调用的顺序,先执行
inputTopicInsurances.pipeInput
,然后执行inputTopicEmployees.pipeInput
方法,看看是否有效?