beam sparkrunner,jdbcio写入tidb,没有错误消息,随着时间的推移,它停止向tidb插入数据

1hdlvixo  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(274)

我使用beam spark runner(spark streaming,第二批)阅读两个kafka主题,加入并写入tidb

finalResultAppsFlyer_ID.apply(JdbcIO.<HashMap<String,String>>write()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                    Constants.JDBC_DRIVER,Constants.JDBC_URL
            )
                    .withUsername("aaaaaa")
                    .withPassword("aaaaaaaaa"))
            .withStatement("insert into user_attribution_count (process_min,process_time,process_date,project,sid,num) values (?,?,?,?,?,?)")
            .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<HashMap<String,String>>() {
                @Override
                public void setParameters(HashMap<String,String> element, PreparedStatement preparedStatement) throws Exception {
                    UpdateTable.replaceIntoUAC(element,preparedStatement);
                }
            }));

刚开始的时候功能还不错,但是随着时间的推移,通常1-2天,插入的速度会突然变慢,几分钟后就会停止向tidb插入数据,但是应用程序仍然在运行,没有错误消息。
我使用Spark流动态定位

--conf spark.dynamicAllocation.enabled=false \
 --conf spark.streaming.dynamicAllocation.enabled=true \
 --conf spark.shuffle.service.enabled=true \
 --conf spark.streaming.dynamicAllocation.scalingInterval=300
 --conf spark.streaming.dynamicAllocation.minExecutors=3 \
 --conf spark.streaming.dynamicAllocation.maxExecutors=8 \

由于没有数据插入到tidb中,因此分配的执行器数将是最大值。当我重新启动应用程序时,会有大量的数据插入到tidb中,保持长时间的库存?然后它就会恢复正常。
我应该关闭jdbc连接还是创建连接池,如何修复它,有什么建议吗?thx

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题