从非常大的 table 中获取数据
Getting data from a very large table
我在 MySQL 数据库中有一个非常大的 table,在 table Users
.
中有 2 亿条记录
我查询,使用JDBC:
public List<Pair<Long, String>> getUsersAll() throws SQLException {
Connection cnn = null;
CallableStatement cs = null;
ResultSet rs = null;
final List<Pair<Long, String>> res = new ArrayList<>();
try {
cnn = dataSource.getConnection();
cs = cnn.prepareCall("select UserPropertyKindId, login from TEST.users;");
rs = cs.executeQuery();
while (rs.next()) {
res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
}
return res;
} catch (SQLException ex) {
throw ex;
} finally {
DbUtils.closeQuietly(cnn, cs, rs);
}
}
接下来,我处理结果:
List<Pair<Long, String>> users= dao.getUsersAll();
if (CollectionUtils.isNotEmpty(users)) {
for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable (new ArrayList<>(partition));
processExecutor.submit(callable);
}
}
但是由于 table 非常大并且全部卸载到内存中,我的应用程序崩溃并出现错误:
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 105,619 milliseconds ago.
如何分批接收数据并按优先顺序处理,才不会一下子把结果全部上传到内存?可以创建游标并将数据上传到非阻塞队列,并在数据到达时对其进行处理。如何做到这一点?
更新:
我的数据库结构:https://www.db-fiddle.com/f/v377ZHkG1YZcdQsETtPm9L/3
当前算法:
从Users
table获取所有数据用户:select UserPropertyKindId, login from Users;
此结果拆分为 2000 对并提交给 ThreadPoolTaskExecutor
:
List<Pair<Long, String>> users= dao.getUsersAll();
if (CollectionUtils.isNotEmpty(users)) {
for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable(new ArrayList<>(partition));
processExecutor.submit(callable));
}
}
In callable for each pair make two queries:
第一次查询:
select distinct entityId
from UserPropertyValue
where userPropertyKindId= ? and value = ? -- value its login from Users table
第二次查询:
select UserIds
from UserPropertyIndex
where UserPropertyKindId = ? and Value = ?
可能有两种情况:
- 第一次查询结果为空:记录,发送通知,继续下一对
- 第二次查询的结果不等于第一次查询的结果(已解码 varbinary 数据。存储了已编码的 entityId)。然后登录,发送通知,转到下一对。
我无法更改基地的结构。我必须在 Java 代码端进行的所有操作。
而不是 Java 端的 Lists.partition(users, 2000),您应该将 mysql 结果集限制为每个请求 2000 个。
select UserPropertyKindId, login from TEST.users limit <offset>, 2000;
更新:正如 Raymond Nijland 在下面的评论中提到的,如果偏移量太大,查询速度可能会显着降低。
一种解决方法是不使用偏移量,而是引入 where 语句,例如 where id > last_user_id。
由于@All_safe在下面评论,自动增量id不存在,大限制偏移的另一个解决方法是:仅在子查询中获取主键,然后连接回主键table.这将强制 mysql 不进行早期行查找,这是大偏移量限制的主要问题。
但您的原始查询仅获取主键列,我认为早期行查找不适用。
您可以将您的优先级加入到查询中
例如,WHERE my_priority = 1 ORDER BY my_sub_priority DESC
就像 Jacob 所说的那样,使用限制 LIMIT 0, 2000
您或许可以分解 inconsistent_users 中的逻辑以查找特定缺陷,然后根据在 EXPLAIN 中获得的见解优化这些查询。也许 find_user_defect(defect) 一种方法可以帮助您按设置处理用户。
我 运行 遇到过类似的情况。我正在从 MySQL 数据库中读取数据并将其复制到 MS SQL 服务器数据库中。不是2亿,一天只有400万。但是我在通信 link 失败时收到了相同的错误消息。我可以通过设置 PreparedStatement.setFetchSize(Integer.MIN_VALUE) 的 fetchsize 来解决它;
于是通讯link失败就消失了。我知道,这不能解决您的列表问题。
你应该在几个层面上处理这个问题:
JDBC 驱动程序获取大小
JDBC 有一个 Statement.setFetchSize()
method, which indicates how many rows are going to be pre-fetched by the JDBC driver prior to you getting them from JDBC. Note that MySQL JDBC drivers don't really implement this correctly, but you can set setFetchSize(Integer.MIN_VALUE)
to prevent it from fetching all rows in one go. See also this answer here.
请注意,您还可以使用 useCursorFetch
在您的连接上激活该功能
你自己的逻辑
你不应该把整个用户列表都放在内存中。您现在正在做的是收集 JDBC 中的所有行,然后稍后使用 Lists.partition(users, 2000)
对您的列表进行分区。这是朝着正确的方向前进,但你还没有做对。相反,做:
try (ResultSet rs = cs.executeQuery()) {
while (rs.next()) {
res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
}
// Process a batch of rows:
if (res.size() >= 2000) {
process(res);
res.clear();
}
}
// Process the remaining rows
process(res);
这里的重要信息是不要加载内存中的所有行 然后 批量处理它们,而是在从 JDBC 流式传输行时直接处理它们。
我在 MySQL 数据库中有一个非常大的 table,在 table Users
.
我查询,使用JDBC:
public List<Pair<Long, String>> getUsersAll() throws SQLException {
Connection cnn = null;
CallableStatement cs = null;
ResultSet rs = null;
final List<Pair<Long, String>> res = new ArrayList<>();
try {
cnn = dataSource.getConnection();
cs = cnn.prepareCall("select UserPropertyKindId, login from TEST.users;");
rs = cs.executeQuery();
while (rs.next()) {
res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
}
return res;
} catch (SQLException ex) {
throw ex;
} finally {
DbUtils.closeQuietly(cnn, cs, rs);
}
}
接下来,我处理结果:
List<Pair<Long, String>> users= dao.getUsersAll();
if (CollectionUtils.isNotEmpty(users)) {
for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable (new ArrayList<>(partition));
processExecutor.submit(callable);
}
}
但是由于 table 非常大并且全部卸载到内存中,我的应用程序崩溃并出现错误:
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet successfully received from the server was 105,619 milliseconds ago.
如何分批接收数据并按优先顺序处理,才不会一下子把结果全部上传到内存?可以创建游标并将数据上传到非阻塞队列,并在数据到达时对其进行处理。如何做到这一点?
更新:
我的数据库结构:https://www.db-fiddle.com/f/v377ZHkG1YZcdQsETtPm9L/3
当前算法:
从
Users
table获取所有数据用户:select UserPropertyKindId, login from Users;
此结果拆分为 2000 对并提交给
ThreadPoolTaskExecutor
:List<Pair<Long, String>> users= dao.getUsersAll(); if (CollectionUtils.isNotEmpty(users)) { for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) { InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable(new ArrayList<>(partition)); processExecutor.submit(callable)); } }
In callable for each pair make two queries:
第一次查询:
select distinct entityId from UserPropertyValue where userPropertyKindId= ? and value = ? -- value its login from Users table
第二次查询:
select UserIds from UserPropertyIndex where UserPropertyKindId = ? and Value = ?
可能有两种情况:
- 第一次查询结果为空:记录,发送通知,继续下一对
- 第二次查询的结果不等于第一次查询的结果(已解码 varbinary 数据。存储了已编码的 entityId)。然后登录,发送通知,转到下一对。
我无法更改基地的结构。我必须在 Java 代码端进行的所有操作。
而不是 Java 端的 Lists.partition(users, 2000),您应该将 mysql 结果集限制为每个请求 2000 个。
select UserPropertyKindId, login from TEST.users limit <offset>, 2000;
更新:正如 Raymond Nijland 在下面的评论中提到的,如果偏移量太大,查询速度可能会显着降低。
一种解决方法是不使用偏移量,而是引入 where 语句,例如 where id > last_user_id。
由于@All_safe在下面评论,自动增量id不存在,大限制偏移的另一个解决方法是:仅在子查询中获取主键,然后连接回主键table.这将强制 mysql 不进行早期行查找,这是大偏移量限制的主要问题。
但您的原始查询仅获取主键列,我认为早期行查找不适用。
您可以将您的优先级加入到查询中
例如,WHERE my_priority = 1 ORDER BY my_sub_priority DESC
就像 Jacob 所说的那样,使用限制 LIMIT 0, 2000
您或许可以分解 inconsistent_users 中的逻辑以查找特定缺陷,然后根据在 EXPLAIN 中获得的见解优化这些查询。也许 find_user_defect(defect) 一种方法可以帮助您按设置处理用户。
我 运行 遇到过类似的情况。我正在从 MySQL 数据库中读取数据并将其复制到 MS SQL 服务器数据库中。不是2亿,一天只有400万。但是我在通信 link 失败时收到了相同的错误消息。我可以通过设置 PreparedStatement.setFetchSize(Integer.MIN_VALUE) 的 fetchsize 来解决它; 于是通讯link失败就消失了。我知道,这不能解决您的列表问题。
你应该在几个层面上处理这个问题:
JDBC 驱动程序获取大小
JDBC 有一个 Statement.setFetchSize()
method, which indicates how many rows are going to be pre-fetched by the JDBC driver prior to you getting them from JDBC. Note that MySQL JDBC drivers don't really implement this correctly, but you can set setFetchSize(Integer.MIN_VALUE)
to prevent it from fetching all rows in one go. See also this answer here.
请注意,您还可以使用 useCursorFetch
你自己的逻辑
你不应该把整个用户列表都放在内存中。您现在正在做的是收集 JDBC 中的所有行,然后稍后使用 Lists.partition(users, 2000)
对您的列表进行分区。这是朝着正确的方向前进,但你还没有做对。相反,做:
try (ResultSet rs = cs.executeQuery()) {
while (rs.next()) {
res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
}
// Process a batch of rows:
if (res.size() >= 2000) {
process(res);
res.clear();
}
}
// Process the remaining rows
process(res);
这里的重要信息是不要加载内存中的所有行 然后 批量处理它们,而是在从 JDBC 流式传输行时直接处理它们。