JavaFlink应用程序无法打印从db2捕获的值

l2osamch  于 2021-06-26  发布在  Flink
关注(0)|答案(0)|浏览(401)

我已经创建了一个flink-db2应用程序,当我像普通java应用程序一样运行这个flink作业时,我正在获取数据。但当我运行与flink作业相同的应用程序时(在flink dashboard中提交后),无法看到数据。我检查了工作概述,它提供了有关获取和接收的记录数的信息。这个计数等于我在db中的数据,甚至在我运行flink作业时没有得到任何错误/异常。
下面是我的代码片段

public static void main(String[] args) {
    logger.info("****************************************************************************");
    try {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
                BasicTypeInfo.STRING_TYPE_INFO
        };
        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
        logger.info("****************************************************************************");

        String user = "XXXXXX";
        logger.info("******Before Query");
        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername("com.ibm.db2.jcc.DB2Driver")
                .setDBUrl("jdbc:db2://XXXX-XXX-XXXX-XX-XXX-X.XXX.XX.XXXX.XXX:XXXX/XXXX")
                .setUsername("XXXX").setPassword("XXXX")
                .setQuery("Select firstname from <schema>.<table>")

                .setRowTypeInfo(rowTypeInfo).finish();
        logger.info("******After Query");
        DataSet<Row> albumData = env.createInput(jdbcInputFormat);
        logger.info("******After Query Execution**************");
        logger.info("***userName::: {}", user);
        logger.info("****List1: {}");
        logger.info("****List2: {}", albumData);
        logger.info("****List3: {}", (albumData == null));
        logger.info("****************************************************************************");
        logger.info("******Getting upto here**************");
        logger.info("****List4: {}", albumData.count()); // from this point not getting printed
        logger.info("****List5: {}" + albumData.collect());
        logger.info("****List6: {}" + albumData.collect().get(0));
        logger.info("***userName: {}", user);
        System.out.println("****List---: " + albumData.count());
        System.out.println("****List---: " + albumData.collect().size());
        System.out.println("****List---: " + albumData.collect().get(0));
    } catch (Exception e) {
        logger.info("******Catch**************");
        logger.info("Error Details: " + e.getMessage());
        e.printStackTrace();
    }
    logger.info("****List Final: {}");
}

flink日志:

2020-06-02 13:04:39,913 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2020-06-02 13:04:39,921 INFO  App                                                           -****************************************************************************
2020-06-02 13:04:39,922 INFO  App                                                           -****************************************************************************
2020-06-02 13:04:39,922 INFO  App                                                           -******Before Query
2020-06-02 13:04:39,924 INFO  App                                                           -******After Query
2020-06-02 13:04:39,924 INFO  App                                                           -******After Query Execution**************
2020-06-02 13:04:39,924 INFO  App                                                           -***userName::: XXXXXXX
2020-06-02 13:04:39,924 INFO  App                                                           -****List1: {}
2020-06-02 13:04:39,924 INFO  App                                                           -****List2: org.apache.flink.api.java.operators.DataSource@1dc1372a
2020-06-02 13:04:39,924 INFO  App                                                           -****List3: false
2020-06-02 13:04:39,924 INFO  App                                                           -****************************************************************************
2020-06-02 13:04:39,924 INFO  App                                                           -******Getting upto here**************
2020-06-02 13:04:39,926 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 0 registered types and 0 default Kryo serializers
2020-06-02 13:04:40,092 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received JobGraph submission XXXXXXXXXXXXXXXXX (Flink Java Job at Tue Jun 02 13:04:39 UTC 2020).
2020-06-02 13:04:40,092 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job XXXXXXXXXXXXXXXXX (Flink Java Job at Tue Jun 02 13:04:39 UTC 2020).
2020-06-02 13:04:40,093 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_69 .
2020-06-02 13:04:40,093 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:40,094 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart back off time strategy NoRestartBackoffTimeStrategy for Flink Java Job at Tue Jun 02 13:04:39 UTC             2020 (XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:40,098 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:40,104 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 6 ms.
2020-06-02 13:04:40,105 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Start building failover regions.
2020-06-02 13:04:40,105 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Created 1 failover regions.
2020-06-02 13:04:40,105 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy@6e2a60f7 for Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:40,106 INFO  org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl       - JobManager runner for job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@flink-jobmanager:<port>/user/jobmanager_69.
2020-06-02 13:04:40,106 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Starting execution of job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX) under job master id 00000000000000000000000000000000.
2020-06-02 13:04:40,106 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy]
2020-06-02 13:04:40,106 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX) switched from state CREATED to RUNNING.
2020-06-02 13:04:40,106 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (0000000000000000000) switched from CREATED to SCHEDULED.
2020-06-02 13:04:40,106 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{XXXXXXXXXXXXXXXXXXX}]
2020-06-02 13:04:40,106 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Connecting to ResourceManager akka.tcp://flink@flink-jobmanager:<port>/user/resourcemanager(00000000000000000000000000000000)
2020-06-02 13:04:40,107 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Resolved ResourceManager address, beginning registration
2020-06-02 13:04:40,107 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Registration at ResourceManager attempt 1 (timeout=100ms)
2020-06-02 13:04:40,107 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:6123/user/jobmanager_69 for job XXXXXXXXXXXXXXXXX.
2020-06-02 13:04:40,108 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:6123/user/jobmanager_69 for job XXXXXXXXXXXXXXXXX.
2020-06-02 13:04:40,108 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2020-06-02 13:04:40,108 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Requesting new slot [SlotRequestId{XXXXXXXXXXXXXXXXXXX}] and profile ResourceProfile{UNKNOWN} from resource manager.
2020-06-02 13:04:40,108 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Request slot with profile ResourceProfile{UNKNOWN} for job XXXXXXXXXXXXXXXXX with allocation id 8aace7baac2c4a369a63465266a36d42.
2020-06-02 13:04:40,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (0000000000000000000) switched from SCHEDULED to DEPLOYING.
2020-06-02 13:04:40,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (attempt #0) to 597a4b7abffd5a4183d22383754bcae3 @ flink-taskmanager-XXXXXX-XXX(dataPort=0000)
2020-06-02 13:04:40,313 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (0000000000000000000) switched from DEPLOYING to RUNNING.
2020-06-02 13:04:42,359 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (1/1) (d1b4433ab70cd69bbf00e787dd2e02b5) switched from CREATED to SCHEDULED.
2020-06-02 13:04:42,360 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (1/1) (d1b4433ab70cd69bbf00e787dd2e02b5) switched from SCHEDULED to DEPLOYING.
2020-06-02 13:04:42,360 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying DataSink (collect()) (1/1) (attempt #0) to 597a4b7abffd5a4183d22383754bcae3 @ flink-taskmanager-XXXXX-XXX(dataPort=0000)
2020-06-02 13:04:42,372 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSource (at createInput(ExecutionEnvironment.java:576) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)) (1/1) (0000000000000000000) switched from RUNNING to FINISHED.
2020-06-02 13:04:42,372 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (1/1) (d1b4433ab70cd69bbf00e787dd2e02b5) switched from DEPLOYING to RUNNING.
2020-06-02 13:04:42,373 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (1/1) (d1b4433ab70cd69bbf00e787dd2e02b5) switched from RUNNING to FINISHED.
2020-06-02 13:04:42,374 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020 (XXXXXXXXXXXXXXXXX) switched from state RUNNING to FINISHED.
2020-06-02 13:04:42,375 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Job XXXXXXXXXXXXXXXXX reached globally terminal state FINISHED.
2020-06-02 13:04:42,380 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping the JobMaster for job Flink Java Job at Tue Jun 02 13:04:39 UTC 2020(XXXXXXXXXXXXXXXXX).
2020-06-02 13:04:42,385 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Suspending SlotPool.
2020-06-02 13:04:42,385 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Close ResourceManager connection XXXXXXXXXXXXXXXXX: JobManager is shutting down..
2020-06-02 13:04:42,385 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      - Stopping SlotPool.
2020-06-02 13:04:42,385 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Disconnect job manager 00000000000000000000000000000000@akka.tcp://flink@flink-jobmanager:<port>/user/jobmanager_69 for job XXXXXXXXXXXXXXXXX from the resource manager.
2020-06-02 13:04:42,385 INFO  org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl       - JobManagerRunner already shutdown.

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题