使用新数据源API对Flink作业进行单元测试

tvz2xvvm  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(177)

在过去,i unit通过使用可插拔的Sources/Sink编写作业来测试flink作业,然后通过简单的Source-/SinkFunctions模拟它们。

public class Example {

    private static SourceFunction<String> someSource;
    private static SourceFunction<String> someOtherSource;
    private static SinkFunction<String> someSink;

    Example(
        SourceFunction<String> someSource,
        SourceFunction<String> someOtherSource,
        SinkFunction<String> someSink
    ) {
        this.someSource = someSource;
        this.someOtherSource = someOtherSource;
        this.someSink = someSink;
    }
    
    void build(StreamExecutionEnvironment env) {
        /*
        ... build your logic here ... 
        */
    }
    
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        Example(
            new FlinkKafkaConsumer<String>(...),
            new FlinkKafkaConsumer<String>(...),
            new FlinkKafkaProducer<String>(...)
        ).build(env);
        
        env.execute();
    }
}

这样,我可以很容易地测试整个工作,只是交换真实的的Kafka下沉和源与自定义Sink-/SourceFunctions
新的DataSources要复杂得多,因为它只是在测试用例中实现它。即使我实现了它,它也会在一个通用的地狱中结束,使它在构造函数中可注入。所以我想知道什么是最好的方法来单元测试整个工作,而不需要提出一个完整的Kafka集群。
是否有任何想法或解决方案?

ao218c7q

ao218c7q1#

你可以在NumberSequenceSource的基础上构建一些东西,然后再加上一个Map。
FLIP-238中描述的DataGeneratorSource就是为了满足这种需求,它将作为1.16的一部分发布(我相信它是独立的,所以您可以复制它并立即开始使用它)。
使用可插拔接收器的另一种方法是使用DataStream#executeAndCollect()

DataStream<Integer> stream = env.fromElements(1, 2, 3);

try (CloseableIterator<Integer> results = stream.executeAndCollect()) {
    assertThat(results).containsInAnyOrder(1, 2, 3);
}

相关问题