java—如何并行运行tableapi和datastreamapi以使用python定义的函数

amrnrhlw  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(226)

我有一个数据流api的工作,这是运行良好,但我需要使用 DataStream<Event> 并将其传递给tableapi以调用register python函数,然后将结果传递回新的 DataStream 重新处理调用的结果。我这里有两个问题,一个是我可以这样运行作业:

/*DataStream Job*/
StreamExecutionEnvironment env = EnvironmentConfiguration.getEnv();
final DataStream<Event> eventsStream = RabbitMQConnector.eventStreamObject(env)
                .flatMap(new RabbitMQConsumer())
                .uid("cep.objects_mapper_id")
                .name("Event Mapper")
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamp) -> event.timestamp.getTime()))
                .name("Watermarks Added");

/*TableAPI job*/
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(), fsSettings);
                    fsTableEnv.getConfig().getConfiguration().setString("python.files", "test.py");
                    fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "python.exe");
                    fsTableEnv.getConfig().getConfiguration().setString("python.executable", "python.exe");
                    fsTableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");

SingleOutputStreamOperator<String> stream = eventsStream.map(x -> x.name);

Table source = fsTableEnv.fromDataStream(stream).as("name");
Table result = source.select("func1(name)");

DataStream<String> finalRes = fsTableEnv.toAppendStream(result, String.class);
finalRes.addSink(new SinkFunction<String>() {
                        @Override
                        public void invoke(String value) {
                            LOG.info("Record from table: " + value);
                        }
                    });

env.execute(job_name);

在这个例子中,我没有任何问题,但是python函数永远不会返回,我担心在我返回之前它永远不会被调用 result.exeute(); ,然后当我从上面和后面应用相同的例子时

finalRes.addSink(new SinkFunction<String>() {
                        @Override
                        public void invoke(String value) {
                            LOG.info("Record from table: " + value);
                        }
                    });

result.execute(); 为了执行表,python函数可以工作,但是datastreamapi作业在tableapi完成之前永远不会执行,但是由于datastreamapi作业从未初始化,因此使用者不工作,因此应该发送到tableapi然后发送到python函数的流总是空的。
我的问题是:有没有办法并行运行两个作业,或者一个接一个地运行?注意:我会创建一个timertask,在datastreamapi作业启动后等待一段时间,然后启动tableapi作业(使用parallelism 1),它似乎可以工作,但是tableapi作业被创建并停止了很多次。
有没有更好的办法?希望有人能理解我的问题。
谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题