在过去,i unit通过使用可插拔的Sources/Sink编写作业来测试flink作业,然后通过简单的Source-/SinkFunctions
模拟它们。
public class Example {
private static SourceFunction<String> someSource;
private static SourceFunction<String> someOtherSource;
private static SinkFunction<String> someSink;
Example(
SourceFunction<String> someSource,
SourceFunction<String> someOtherSource,
SinkFunction<String> someSink
) {
this.someSource = someSource;
this.someOtherSource = someOtherSource;
this.someSink = someSink;
}
void build(StreamExecutionEnvironment env) {
/*
... build your logic here ...
*/
}
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Example(
new FlinkKafkaConsumer<String>(...),
new FlinkKafkaConsumer<String>(...),
new FlinkKafkaProducer<String>(...)
).build(env);
env.execute();
}
}
这样,我可以很容易地测试整个工作,只是交换真实的的Kafka下沉和源与自定义Sink-/SourceFunctions
。
新的DataSources要复杂得多,因为它只是在测试用例中实现它。即使我实现了它,它也会在一个通用的地狱中结束,使它在构造函数中可注入。所以我想知道什么是最好的方法来单元测试整个工作,而不需要提出一个完整的Kafka集群。
是否有任何想法或解决方案?
1条答案
按热度按时间ao218c7q1#
你可以在
NumberSequenceSource
的基础上构建一些东西,然后再加上一个Map。FLIP-238中描述的
DataGeneratorSource
就是为了满足这种需求,它将作为1.16的一部分发布(我相信它是独立的,所以您可以复制它并立即开始使用它)。使用可插拔接收器的另一种方法是使用
DataStream#executeAndCollect()
: