动态调用flink运算符

j7dteeu8  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(343)

我最近开始学习流处理,现在正在尝试apache flink。我正在尝试编写一个作业,从kafka主题中读取事件,可能执行一些无状态链式转换,并对另一个应用程序进行rest调用以发布每个转换的事件。例如,我的main方法可以如下所示-

public class KafkaSourceToRestSinkJob {
    public static void main(String[] args) {
        String configPath = args[0];
        //Read configuration for the job (like kafka properties, rest uri for sink, possibly operators to invoke)
        ...
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProps));
        dataStream.addSink(new RestSinkFunction<>()); //Custom sink function implementing org.apache.flink.streaming.api.functions.sink.SinkFunction
        //Chain some operators depending on some parameters in the config file
        ...
        env.execute("Confused Job");
    }
}

我的目标是为具有相同源和汇类型的多个作业提供一个公共jar工件。如果我需要一个作业来执行转换a、b和c(实现将出现在jar中),我可以在配置文件中指定它们,并在程序args中传递文件的路径。
下面是我的问题-
是否可以动态调用运算符?
我知道在接收器中进行rest调用可能会导致一些不必要的延迟,但在我的应用程序中,这是可以忍受的。我也不在乎回应。记住这一点,我有理由避免休息池吗?
总的来说,我是不是大错特错了?
谢谢您!

q3qa4bjr

q3qa4bjr1#

我可能会看看flink sql。您可以定义公共源/汇,然后将一个sql查询传递给flink。
我在过去使用sparksql时也有类似的设置,它运行得相当好。您不需要发明自己的规范语言,而且它更容易理解。

g9icjywg

g9icjywg2#

不能动态修改作业图的拓扑,但是可以实现一个flatmap操作符,动态加载一个类(在配置中指定),然后使用它来转换事件流。
至于rest sink,如果您需要保证端到端的一次语义,那么您需要小心地将sink与flink的检查点相匹配。flinkkafkaconsumer通过倒带和重放自上一个检查点以来的事件来处理恢复。如果不小心,这将导致在恢复期间重复的结果被推送到rest接收器。如果rest sink只在外部系统上执行幂等更新,那么这很好,但是您需要使rest sink具有状态和事务性。

相关问题