azure:对于准备好的语句,超过了每个会话20MB的内存限制

vwoqyblh  于 2021-06-24  发布在  Pig
关注(0)|答案(2)|浏览(329)

我正在执行一批批,包括准备好的 insert 声明

public static void main(String... args) throws Exception {
    Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
    BufferedReader csv = new BufferedReader(new InputStreamReader(Main.class.getClassLoader().getResourceAsStream("records.csv")));
    String line;
    createConnectionAndPreparedStatement();
    while ((line = csv.readLine()) != null) {
        tupleNum++;
        count++;
        List<String> row = new ArrayList<String>(Arrays.asList(line.split(";")));

        tupleCache.add(row);
        addBatch(row, ps);
        if (count > BATCH_SIZE) {
            count = 0;
            executeBatch(ps);
            tupleCache.clear();
        }
    }
}

protected static void createConnectionAndPreparedStatement() throws SQLException {
    System.out.println("Opening new connection!");
    con = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
    con.setAutoCommit(true);
    con.setAutoCommit(false);
    ps = con.prepareStatement(insertQuery);

    count = 0;
}

private static void executeBatch(PreparedStatement ps) throws SQLException, IOException, InterruptedException {
    try {
        ps.executeBatch();
    } catch (BatchUpdateException bue) {
        if (bue.getMessage() != null && bue.getMessage().contains("Exceeded the memory limit")) {
            // silently close the old connection to free resources
            try {
                con.close();
            } catch (Exception ex) {}
            createConnectionAndPreparedStatement();
            for (List<String> t : tupleCache) {
                addBatch(t, ps);
            }
            // let's retry once
            ps.executeBatch();
        }
    }
    System.out.println("Batch succeeded! -->" + tupleNum );
    con.commit();
    ps.clearWarnings();
    ps.clearBatch();
    ps.clearParameters();
}

private static void addBatch(List<String> tuple, PreparedStatement ps) throws SQLException {
    int sqlPos = 1;
    int size = tuple.size();
    for (int i = 0; i < size; i++) {
        String field = tuple.get(i);
        //log.error(String.format("Setting value at pos [%s] to value [%s]", i, field));
        if (field != null) {
            ps.setString(sqlPos, field);
            sqlPos++;
        } else {
            ps.setNull(sqlPos, java.sql.Types.VARCHAR);
            sqlPos++;
        }
    }
    ps.addBatch();
}

因此,在独立应用程序中,一切正常,在700k批插入之后没有异常发生。但当我在自定义pig中执行相同的代码时 StoreFunc 在大约6-7k批插入之后,我得到以下异常:

java.sql.BatchUpdateException: 112007;Exceeded the memory limit of 20 MB per session for prepared statements. Reduce the number or size of the prepared statements.
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:1824)

只有重新启动连接才有帮助。有人能帮我想一想为什么会这样以及如何解决它吗?

i5desfxk

i5desfxk1#

根据您的描述和错误信息,根据我的经验,我认为问题是由sqlazure服务器端的内存配置引起的,例如服务器资源池中连接的内存限制。
我试着按照线索寻找关于连接内存限制的具体解释,但是失败了,除了下面的内容。
连接存储器
SQLServer为来自客户机的每个连接预留三个数据包缓冲区。每个缓冲区的大小都是根据sp\u configure存储过程指定的默认网络数据包大小来确定的。如果默认网络数据包大小小于8kb,则这些数据包的内存来自sql server的缓冲池。如果是8kb或更大,则从sql server的memtoleave区域分配内存。
我继续寻找 packet size & MemToLeave 并查看它们。
基于以上信息,我猜“对于准备好的语句,超过了每个会话20MB的内存限制”是指在SQLAzure示例的最大内存缓冲池上并行连接使用的所有内存。
所以我建议了两个解决方案,你可以试试。
建议降低 BATCH_SIZE 变量使服务器内存开销小于内存缓冲池的最大大小。
尝试扩展SQLAzure示例。
希望有帮助。
这里有两个新建议。
我真的不确定msjdbc驱动程序是否支持当前使用apachepig来完成类似并行etl工作的场景。请尝试使用 jtds jdbc驱动程序而不是ms驱动程序。
我认为更好的方法是使用更专业的工具来实现这一点,比如 sqoop 或者 kettle .

a64a0gku

a64a0gku2#

当我尝试向azuresql数据仓库写入Dataframe时,我遇到了同样的问题。我指定了chunksize,为load用户分配了最大的资源类。然而,这个问题仍然存在。
根据文档,insert value语句在默认情况下只使用smallrc资源类。
我能想到的唯一解决方案是扩大dwu的规模,但这不是一个最佳的解决方案,因为成本会非常高。

相关问题