Flink ,TableResult#打印和DataStream#打印之间的差异

2ledvvac  于 2023-03-16  发布在  Apache
关注(0)|答案(1)|浏览(212)

我使用的是Flink 1.14,对于下面的代码片段,如果我想打印结果,似乎TableResult#print和DataStream#print都可以使用,但我确实看到了它们之间的不同行为,TableResult#print每3分钟打印一次结果,这似乎与检查点间隔有关。而DataStream#print每10秒打印一次结果。我很好奇下面会发生什么?

StreamExecutionEnvironment env = ...;
env.setParallelism(1);
env.enableCheckpointing(3 * 60000);
StreamTableEnvironment tableEnv = ...;
// the mock source produces one message every 10 seconds
SingleOutputStreamOperator<Tuple3<Integer, String, Integer>> ds = ...;
Table table = tableEnv.fromDataStream(ds, schema);

// this prints every 3 minutes
tableEnv.createTemporaryView("foobar", table);
tableEnv.executeSql("select f0, f1, f2 from foobar").print();

// this prints every 10 seconds
//tableEnv.toDataStream(table).print();
//env.execute();
wsxa1bj1

wsxa1bj11#

而DataStream#print每10秒打印一次结果,我很想知道下面发生了什么?
这是因为DataStream.print()实际上将打印操作视为接收器:

/**
 * Writes a DataStream to the standard output stream (stdout).
 *
 * <p>For each element of the DataStream the result of {@link Object#toString()} is written.
 *
 * <p>NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
 * worker.
 *
 * @return The closed DataStream.
 */
@PublicEvolving
public DataStreamSink<T> print() {
    PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
    return addSink(printFunction).name("Print to Std. Out");
}

所以你会看到每10秒就有一个结果被写出来,因为你正在以相同的速率向流生成一个新消息,消息通过流,最终被写到接收器,接收器只是打印出标准输出。
TableResult#print每3分钟打印一次结果,这似乎与检查点间隔有关。
这一点您也是正确的,如果您看一下TableResult.print(),它与下面接口中定义的检查点间隔密切相关:

/**
 * Print the result contents as tableau form to client console.
 *
 * <p>This method has slightly different behaviors under different checkpointing settings (to
 * enable checkpointing for a streaming job, set checkpointing properties through {@link
 * TableConfig}).
 *
 * <ul>
 *   <li>For batch jobs or streaming jobs without checkpointing, this method has neither
 *       exactly-once nor at-least-once guarantee. Query results are immediately accessible by
 *       the clients once they're produced, but exceptions will be thrown when the job fails and
 *       restarts.
 *   <li>For streaming jobs with exactly-once checkpointing, this method guarantees an
 *       end-to-end exactly-once record delivery. A result will be accessible by clients only
 *       after its corresponding checkpoint completes.
 *   <li>For streaming jobs with at-least-once checkpointing, this method guarantees an
 *       end-to-end at-least-once record delivery. Query results are immediately accessible by
 *       the clients once they're produced, but it is possible for the same result to be
 *       delivered multiple times.
 * </ul>
 *
 * <p>In order to fetch result to local, you can call either {@link #collect()} and {@link
 * #print()}. But, they can't be called both on the same {@link TableResult} instance, because
 * the result can only be accessed once.
 */
void print();

相关问题