使用表API将作业名称设置为Flink作业

ia2d9nvy  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(188)

我想为使用Table API编写的Flink应用程序设置一个作业名称,就像我使用Streaming API env.execute(jobName)所做的那样。

我想替换:

我在文档中找不到一种方法,只能在运行jar中的作业时执行此操作

bin/flink run -d -yD pipeline.name=MyPipelineName-v1.0 ...

*** Flink :**1.14.5
***环境:**Yarn
更新日期:

如果有人会面临同样的情况。我们可以添加表API管道到数据流API文档,这样做将允许我们有一个所需的作业名称设置编程。
例如:

val sinkDescriptor = TableDescriptor.forConnector("kafka")
        .option("topic","topic_out")
        .option("properties.bootstrap.servers", "localhost:9092")
        .schema(schema)
        .format(FormatDescriptor.forFormat("avro").build())
        .build()

    tEnv.createTemporaryTable("OutputTable", sinkDescriptor)

    statementSet.addInsert(sinkDescriptor, tA)
    statementSet.attachAsDataStream()

    env.execute(jobName)
y53ybaqx

y53ybaqx1#

只有StreamExecutionEnvironment会在数据流图表上呼叫setJobName

相关问题