我已经创建了一个flink-db2应用程序,当我像普通java应用程序一样运行这个flink作业时,我正在获取数据。但当我运行与flink作业相同的应用程序时(在flink dashboard中提交后),无法看到数据。我检查了工作概述,它提供了有关获取和接收的记录数的信息。这个计数等于我在db中的数据,甚至在我运行flink作业时没有得到任何错误/异常。
下面是我的代码片段
public static void main(String[] args) {
logger.info("****************************************************************************");
try {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
BasicTypeInfo.STRING_TYPE_INFO
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
logger.info("****************************************************************************");
String user = "XXXXXX";
logger.info("******Before Query");
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.ibm.db2.jcc.DB2Driver")
.setDBUrl("jdbc:db2://XXXX-XXX-XXXX-XX-XXX-X.XXX.XX.XXXX.XXX:XXXX/XXXX")
.setUsername("XXXX").setPassword("XXXX")
.setQuery("Select firstname from <schema>.<table>")
.setRowTypeInfo(rowTypeInfo).finish();
logger.info("******After Query");
DataSet<Row> albumData = env.createInput(jdbcInputFormat);
logger.info("******After Query Execution**************");
logger.info("***userName::: {}", user);
logger.info("****List1: {}");
logger.info("****List2: {}", albumData);
logger.info("****List3: {}", (albumData == null));
logger.info("****************************************************************************");
logger.info("******Getting upto here**************");
logger.info("****List4: {}", albumData.count()); // from this point not getting printed
logger.info("****List5: {}" + albumData.collect());
logger.info("****List6: {}" + albumData.collect().get(0));
logger.info("***userName: {}", user);
System.out.println("****List---: " + albumData.count());
System.out.println("****List---: " + albumData.collect().size());
System.out.println("****List---: " + albumData.collect().get(0));
} catch (Exception e) {
logger.info("******Catch**************");
logger.info("Error Details: " + e.getMessage());
e.printStackTrace();
}
logger.info("****List Final: {}");
}
flink日志:
2020-06-02 13:04:39,913 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2020-06-02 13:04:39,921 INFO App -****************************************************************************
2020-06-02 13:04:39,922 INFO App -****************************************************************************
2020-06-02 13:04:39,922 INFO App -******Before Query
2020-06-02 13:04:39,924 INFO App -******After Query
2020-06-02 13:04:39,924 INFO App -******After Query Execution**************
2020-06-02 13:04:39,924 INFO App -***userName::: XXXXXXX
2020-06-02 13:04:39,924 INFO App -****List1: {}
2020-06-02 13:04:39,924 INFO App -****List2: org.apache.flink.api.java.operators.DataSource@1dc1372a
2020-06-02 13:04:39,924 INFO App -****List3: false
2020-06-02 13:04:39,924 INFO App -****************************************************************************
2020-06-02 13:04:39,924 INFO App -******Getting upto here**************
2020-06-02 13:04:39,926 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers
2020-06-02 13:04:40,092 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission XXXXXXXXXXXXXXXXX (Flink Java Job at Tue Jun 02 13:04:39 UTC 2020).
2020-06-02 13:04:40,092 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job XXXXXXXXXXXXXXXXX (Flink Java Job at Tue Jun 02 13:04:39 UTC 2020).
2020-06-02 13:04:40,093 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_69 .
2020-06-02 13:04:40,093 INFO org.apache.flink.runtime.jobmaster.JobMaster - Initializing job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:40,094 INFO org.apache.flink.runtime.jobmaster.JobMaster - Using restart back off time strategy NoRestartBackoffTimeStrategy for Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:40,098 INFO org.apache.flink.runtime.jobmaster.JobMaster - Running initialization on master for job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:40,104 INFO org.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 6 ms.
2020-06-02 13:04:40,105 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy - Start building failover regions.
2020-06-02 13:04:40,105 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy - Created 1 failover regions.
2020-06-02 13:04:40,105 INFO org.apache.flink.runtime.jobmaster.JobMaster - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy@6e2a60f7 for Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:40,106 INFO org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl - JobManager runner for job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@flink-jobmanager:<port>/user/jobmanager_69.
2020-06-02 13:04:40,106 INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX) under job master id 00000000000000000000000000000000.
2020-06-02 13:04:40,106 INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy]
2020-06-02 13:04:40,106 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX) switched from state CREATED to RUNNING.
2020-06-02 13:04:40,106 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (0000000000000000000) switched from CREATED to SCHEDULED.
2020-06-02 13:04:40,106 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{XXXXXXXXXXXXXXXXXXX}]
2020-06-02 13:04:40,106 INFO org.apache.flink.runtime.jobmaster.JobMaster - Connecting to ResourceManager akka.tcp://flink@flink-jobmanager:<port>/user/resourcemanager(00000000000000000000000000000000)
2020-06-02 13:04:40,107 INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved ResourceManager address, beginning registration
2020-06-02 13:04:40,107 INFO org.apache.flink.runtime.jobmaster.JobMaster - Registration at ResourceManager attempt 1 (timeout=100ms)
2020-06-02 13:04:40,107 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:6123/user/jobmanager_69 for job XXXXXXXXXXXXXXXXX.
2020-06-02 13:04:40,108 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:6123/user/jobmanager_69 for job XXXXXXXXXXXXXXXXX.
2020-06-02 13:04:40,108 INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2020-06-02 13:04:40,108 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Requesting new slot [SlotRequestId{XXXXXXXXXXXXXXXXXXX}] and profile ResourceProfile{UNKNOWN} from resource manager.
2020-06-02 13:04:40,108 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{UNKNOWN} for job XXXXXXXXXXXXXXXXX with allocation id 8aace7baac2c4a369a63465266a36d42.
2020-06-02 13:04:40,127 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (0000000000000000000) switched from SCHEDULED to DEPLOYING.
2020-06-02 13:04:40,127 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (attempt #0) to 597a4b7abffd5a4183d22383754bcae3 @ flink-taskmanager-XXXXXX-XXX(dataPort=0000)
2020-06-02 13:04:40,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (0000000000000000000) switched from DEPLOYING to RUNNING.
2020-06-02 13:04:42,359 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSink (collect()) (1/1) (d1b4433ab70cd69bbf00e787dd2e02b5) switched from CREATED to SCHEDULED.
2020-06-02 13:04:42,360 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSink (collect()) (1/1) (d1b4433ab70cd69bbf00e787dd2e02b5) switched from SCHEDULED to DEPLOYING.
2020-06-02 13:04:42,360 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying DataSink (collect()) (1/1) (attempt #0) to 597a4b7abffd5a4183d22383754bcae3 @ flink-taskmanager-XXXXX-XXX(dataPort=0000)
2020-06-02 13:04:42,372 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (0000000000000000000) switched from RUNNING to FINISHED.
2020-06-02 13:04:42,372 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSink (collect()) (1/1) (d1b4433ab70cd69bbf00e787dd2e02b5) switched from DEPLOYING to RUNNING.
2020-06-02 13:04:42,373 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSink (collect()) (1/1) (d1b4433ab70cd69bbf00e787dd2e02b5) switched from RUNNING to FINISHED.
2020-06-02 13:04:42,374 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX) switched from state RUNNING to FINISHED.
2020-06-02 13:04:42,375 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job XXXXXXXXXXXXXXXXX reached globally terminal state FINISHED.
2020-06-02 13:04:42,380 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020(XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:42,385 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool.
2020-06-02 13:04:42,385 INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection XXXXXXXXXXXXXXXXX: JobManager is shutting down..
2020-06-02 13:04:42,385 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool.
2020-06-02 13:04:42,385 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Disconnect job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:<port>/user/jobmanager_69 for job XXXXXXXXXXXXXXXXX from the resource manager.
2020-06-02 13:04:42,385 INFO org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl - JobManagerRunner already shutdown.
暂无答案!
目前还没有任何答案,快来回答吧!