我使用beam spark runner(spark streaming,第二批)阅读两个kafka主题,加入并写入tidb
finalResultAppsFlyer_ID.apply(JdbcIO.<HashMap<String,String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
Constants.JDBC_DRIVER,Constants.JDBC_URL
)
.withUsername("aaaaaa")
.withPassword("aaaaaaaaa"))
.withStatement("insert into user_attribution_count (process_min,process_time,process_date,project,sid,num) values (?,?,?,?,?,?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<HashMap<String,String>>() {
@Override
public void setParameters(HashMap<String,String> element, PreparedStatement preparedStatement) throws Exception {
UpdateTable.replaceIntoUAC(element,preparedStatement);
}
}));
刚开始的时候功能还不错,但是随着时间的推移,通常1-2天,插入的速度会突然变慢,几分钟后就会停止向tidb插入数据,但是应用程序仍然在运行,没有错误消息。
我使用Spark流动态定位
--conf spark.dynamicAllocation.enabled=false \
--conf spark.streaming.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.streaming.dynamicAllocation.scalingInterval=300
--conf spark.streaming.dynamicAllocation.minExecutors=3 \
--conf spark.streaming.dynamicAllocation.maxExecutors=8 \
由于没有数据插入到tidb中,因此分配的执行器数将是最大值。当我重新启动应用程序时,会有大量的数据插入到tidb中,保持长时间的库存?然后它就会恢复正常。
我应该关闭jdbc连接还是创建连接池,如何修复它,有什么建议吗?thx
暂无答案!
目前还没有任何答案,快来回答吧!