java—从非常大的表中获取数据

k4ymrczo  于 2021-06-21  发布在  Mysql
关注(0)|答案(4)|浏览(394)

我在mysql数据库中有一个非常大的表,表中有2亿条记录 Users .
我使用jdbc进行查询:

public List<Pair<Long, String>> getUsersAll() throws SQLException {
        Connection cnn = null;
        CallableStatement cs = null;
        ResultSet rs = null;
        final List<Pair<Long, String>> res = new ArrayList<>();
        try {
            cnn = dataSource.getConnection();
            cs = cnn.prepareCall("select UserPropertyKindId, login from TEST.users;");
            rs = cs.executeQuery();
            while (rs.next()) {
                res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
            }
            return res;
        } catch (SQLException ex) {
            throw ex;
        } finally {
            DbUtils.closeQuietly(cnn, cs, rs);
        }
    }

接下来,我处理结果:

List<Pair<Long, String>> users= dao.getUsersAll();
            if (CollectionUtils.isNotEmpty(users)) {
                for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
                    InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable (new ArrayList<>(partition));
                    processExecutor.submit(callable);
                }
            }

但由于表非常大,而且都已卸载到内存中,因此我的应用程序会因以下错误而崩溃:
com.mysql.jdbc.exceptions.jdbc4.communicationsexception:通信链路故障
从服务器成功接收的最后一个数据包是105619毫秒前。
如何接收部分数据并按优先级顺序进行处理,以避免将所有结果一次上载到内存中?可以创建一个游标,将数据上传到非阻塞队列,并在数据到达时对其进行处理。如何做到这一点?
更新:
我的数据库结构:https://www.db-fiddle.com/f/v377zhkg1yzcdqsettpm9l/3
当前算法:
获取用户的所有数据 Users 表格: select UserPropertyKindId, login from Users; 此结果分成2000对,并提交给 ThreadPoolTaskExecutor :

List<Pair<Long, String>> users= dao.getUsersAll();

if (CollectionUtils.isNotEmpty(users)) {
    for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
        InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable(new ArrayList<>(partition));
        processExecutor.submit(callable));
    }
}

在callable for each pair中进行两个查询:
第一个查询:

select distinct entityId 
from UserPropertyValue 
where userPropertyKindId= ? and value = ? -- value its login from Users table

第二个查询:

select UserIds 
from UserPropertyIndex 
where UserPropertyKindId = ? and Value = ?

可能有两种情况:
第一个查询的结果为空:记录、发送通知、继续下一对
第二个查询的结果不等于第一个查询的结果(varbinary data decoded)。存在存储的编码实体ID)。然后登录,发送通知,转到下一对。
我不能改变基地的结构。所有我必须在java代码方面做的操作。

ttp71kqs

ttp71kqs1#

不要在java端使用lists.partition(users,2000),而应该将mysql结果集限制为每个请求2000个。

select UserPropertyKindId, login from TEST.users limit <offset>, 2000;

更新:正如raymondnijland在下面的评论中提到的,如果偏移量太大,查询速度可能会明显减慢。
一种解决方法是不使用偏移量,而是引入where语句,例如where id>last\u user\u id。
由于@all\u safe在下面进行了注解,因此不存在自动增量标识,因此对于大限制偏移量的另一种解决方法是:仅在子查询中获取主键,然后联接回主表。这将迫使mysql不进行早期行查找,这是偏移量限制过大的主要问题。
但您最初的查询只获取主键列,我认为早期的行查找不适用。

tvz2xvvm

tvz2xvvm2#

您可以将优先级烘焙到查询中,例如。, WHERE my_priority = 1 ORDER BY my_sub_priority DESC 就像雅各布说的,使用极限 LIMIT 0, 2000 您可能可以分解不一致用户的逻辑以查找特定的缺陷,然后使用在explain中获得的见解优化这些查询。也许一个find\u user\u defect(缺陷)的方法可以帮助您处理setwise用户。

qnzebej0

qnzebej03#

您应该在几个层次上处理这个问题:

jdbc驱动程序获取大小

jdbc有一个 Statement.setFetchSize() 方法,它指示在从jdbc获取行之前,jdbc驱动程序将预取多少行。请注意,mysql jdbc驱动程序并没有真正正确地实现这一点,但是您可以设置 setFetchSize(Integer.MIN_VALUE) 以防止它一次性获取所有行。在这里也可以看到这个答案。
注意,您也可以使用激活连接上的功能 useCursorFetch ####你自己的逻辑
您不应该将整个用户列表放在内存中。现在要做的是从jdbc收集所有的行,然后使用 Lists.partition(users, 2000) . 这是朝着正确的方向发展,但你还没有做对。相反,请执行以下操作:

try (ResultSet rs = cs.executeQuery()) {
    while (rs.next()) {
        res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
    }

    // Process a batch of rows:
    if (res.size() >= 2000) {
        process(res);
        res.clear();
    }
}

// Process the remaining rows
process(res);

这里的重要信息不是加载内存中的所有行,然后成批处理它们,而是在从jdbc流式处理行时直接处理它们。

xvw2m8pv

xvw2m8pv4#

我也遇到过类似的情况。我正在从mysql数据库读取数据并将其复制到mssqlserverdb中。不是2亿,每天只有400万。但我有同样的错误信息与通信链路故障。我可以通过设置preparedstatement.setfetchsize(integer.min\u value)的fetchsize来解决这个问题;所以通信链路故障消失了。我知道,这不能解决你的单子问题。

相关问题