我在使用azuresqldbspark的javaapi插入csv文件时遇到了死锁问题。在批量插入3go(约1000万rpws)或更大的文件(有时我们插入的文件非常大,可以达到30gb)的过程中,这个问题会随机发生。我们要插入的表使用列存储索引进行索引(表的当前大小为5亿行)
似乎,死锁victme的问题是由于在插入过程中通过deltastore。我将批大小设置为1048576以避免遍历deltastore,但是我发现这个大小没有被考虑在内(可以链接到defaultmaxbufferrows和defaultmaxbuffersize,我不能在所用api的配置级别修改它们)(在spark接口捕获下面)
https://i.stack.imgur.com/as93j.png
我还想减少在数据库上启动的并行大容量插入会话的数量,但是这个api似乎不可能。唯一的解决方案是改变spark上下文的配置,这在我的例子中是不可能的,因为我只能在整个应用程序中使用一个spark上下文,因为我们使用嵌入的spark(localspark)
下面是进行插入的配置和代码
java.util.Map<String, Object> map = new java.util.HashMap<String, Object>();
map.put("url", resultProperties.dbServer);
map.put("databaseName", resultProperties.dbName);
map.put("user", resultProperties.dbUser);
map.put("password", resultProperties.dbPassword);
map.put("dbTable", "dbo.RESULT_DATA");
map.put("bulkCopyBatchSize", "1048576");
map.put("bulkCopyTableLock", "false");
map.put("bulkCopyTimeout", "600");
Map<String, Object> props = JavaConverters.mapAsScalaMapConverter(map).asScala().toMap(Predef.$conforms());
config = new com.microsoft.azure.sqldb.spark.config.SqlDBConfigBuilder(props).build();
dataFrameFunctions.bulkCopyToSqlDB(config, metadata,false);
在我的例子中不可能使用bulkcopytablelock选项,因为我可以在插入的同时在同一个表上有delete和select请求。
你知道如何在本地spark的约束下减少并行批量插入会话的数量吗?你知道我该怎么做吗,因为api考虑到了大批量,以避免通过deltastore?
我找不到其他有效的api可以使用spark进行批量插入。请告诉我,如果你有任何其他选择快速批量插入大文件。
暂无答案!
目前还没有任何答案,快来回答吧!