我想为使用Table API编写的Flink应用程序设置一个作业名称,就像我使用Streaming API env.execute(jobName)
所做的那样。
我想替换:
我在文档中找不到一种方法,只能在运行jar中的作业时执行此操作
bin/flink run -d -yD pipeline.name=MyPipelineName-v1.0 ...
*** Flink :**1.14.5
***环境:**Yarn
更新日期:
如果有人会面临同样的情况。我们可以添加表API管道到数据流API文档,这样做将允许我们有一个所需的作业名称设置编程。
例如:
val sinkDescriptor = TableDescriptor.forConnector("kafka")
.option("topic","topic_out")
.option("properties.bootstrap.servers", "localhost:9092")
.schema(schema)
.format(FormatDescriptor.forFormat("avro").build())
.build()
tEnv.createTemporaryTable("OutputTable", sinkDescriptor)
statementSet.addInsert(sinkDescriptor, tA)
statementSet.attachAsDataStream()
env.execute(jobName)
1条答案
按热度按时间y53ybaqx1#
只有StreamExecutionEnvironment会在数据流图表上呼叫
setJobName
。