java—如何从字符串创建kafka使用者记录以构建junit测试用例

nwlqm0z1  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(204)

我需要一些帮助来为我的javakafka消费者构建junit测试用例。
我的原始源代码有如下方法,并且需要为相同的方法创建一个单元测试用例。

public void processConsumerRecord(ConsumerRecords<String, GenericRecord> records, boolean isEventProcessed, boolean isOffsetCommitted,
                                  int totalErrorCountFromSinkService, int totalErrorCount, Consumer<String, GenericRecord> consumer) {

...... }
我的kafka使用者正在从kafka主题提取消息,我需要能够以consumerrecords格式提供输入消息,但作为单元测试的一部分,我不会轮询来自kafka的消息,相反,模拟来自原始kafka主题的消息,并将静态输入消息提供给测试上述方法的单元测试用例,如图所示。如何以consumerrecords<string,genericrecord>的形式创建模拟输入消息?

lmyy7pcs

lmyy7pcs1#

您可以创建consumerrecord,而不是像下面这样嘲笑-

Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>();

    String topic = "topic";
    records.put(new TopicPartition(topic, 0), new ArrayList<ConsumerRecord<Integer, String>>());
    ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1");
    ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2");
    records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
    records.put(new TopicPartition(topic, 2), new ArrayList<ConsumerRecord<Integer, String>>());

    ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);

相关问题