除了这个问题,我还创建了这个示例来集成 DataStreamAPI
以及 TableAPI
这次我没有错误,我有两个工作而不是一个,一个是为 DataStreamAPI
运行完美,另一个作业是为 TableAPI
它运行得也很完美,但唯一的问题是永远不会从 DataStreamAPI
,示例:
/*FILTERING NULL IDs*/
final SingleOutputStreamOperator<Event> stream_filtered = eventsStream
.filter(new NullidEventsFilterFunction())
.uid("id_filter_operator")
.name("Event Filter");
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(), fsSettings);
SingleOutputStreamOperator<String> toTable = stream_filtered.map(x -> x.id).name("Map for table");
Table source = fsTableEnv.fromDataStream(toTable);
source.execute(); /*without this line the TableAPI job is not started, but nothing happens if is not there either*/
DataStream<String> finalRes = fsTableEnv.toAppendStream(source, String.class);
finalRes.map((MapFunction<String, String>) value -> value)
.name("Mapping after table")
.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
}).name("Sink after map from table");
/*STARTING TRANSFORMATIONS*/
Init.init(stream_filtered);
env.execute(job_name);
通过这样做,我可以在记录器中看到以下行:
INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Event Mapper -> Watermarks Added -> Event Filter -> Map for table -> SourceConversion(table=[Unregistered_DataStream_5], fields=[f0]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (0d3cd78d35480c44f09603786bf775e7) switched from DEPLOYING to RUNNING.
但没有收到或发出任何记录。
请参见图像以了解 DataStream
工作
看到图片了吗 TableAPI
工作
你知道吗?提前谢谢。谨致问候!
1条答案
按热度按时间j8ag8udp1#
如果您想编写一个以datastreamapi开始和结束的作业,并在中间使用表api,那么下面是一个可以构建的简单示例。
请注意,所涉及的细节在不同的版本中有所变化,这个特定的示例与Flink1.11所编写的一样有效。flip-136:正在改进datastream和tableapi之间的互操作性,使之更加容易。
您可能会担心,在web ui中,它会显示“已发送的记录:0”和“已接收的记录:0”。这是非常误导的。这些flink度量只测量flink中的记录和字节流,不报告外部系统的任何i/o。这些度量也不报告链接在一起的运算符之间的记录和字节流。这两个作业中的所有内容都是链接的,因此在这种情况下,发送/接收的记录/字节将始终为零。