使用 jdbc 将行批量插入 Spanner 时加载性能低下
low loading performance while batch inserting rows into Spanner using jdbc
背景:我正在尝试将 TSV 格式的数据文件(从 MySQL 数据库转储)加载到 GCP Spanner table。
- 客户端库:官方Spanner JDBC依赖v1.15.0
- table 架构:两个字符串类型的列和十个 int 类型的列
- GCP Spanner 实例:配置为具有 5 个节点的多区域 nam6
我的加载程序运行在GCP VM中,是访问Spanner实例的独占客户端。自动提交已启用。批量插入是我的程序执行的唯一 DML 操作,批量大小在 1500 左右。在每次提交中,它完全用完了突变限制,即 20000。同时,提交大小低于 5MB(值两个字符串类型的列是小型的)。根据主键的第一列对行进行分区,以便每次提交都可以发送到很少的分区以获得更好的性能。
通过上述所有配置和优化,插入率仅为每秒 1k 行左右。这真的让我很失望,因为我有超过 8 亿行要插入。我确实注意到 the official doc 提到了大约。多区域 Spanner 实例的峰值写入(QPS 总计)为 1800。
所以我这里有两个问题:
- 考虑到如此低的峰值写入 QPS,是否意味着 GCP 不期望或不支持客户将大型数据集迁移到多区域 Spanner 实例?
- 我看到 Spanner 监控的读取延迟很高。我没有任何阅读请求。我的猜测是,在写入行时,Spanner 需要先读取并检查是否存在具有相同主键的行。如果我的猜测是正确的,为什么要花这么多时间?如果没有,我能否获得有关这些读取操作如何发生的任何指导?
我不太清楚您是如何设置加载数据的客户端应用程序的。我的初步印象是您的客户端应用程序可能没有并行执行足够的事务。您通常应该能够插入远远超过 1,000 rows/second,但它需要您并行执行多个事务,可能来自多个 VM。我使用以下简单示例来测试从我的本地计算机到 单个 节点 Spanner 实例的负载吞吐量,这给了我大约 1,500 rows/second.[=14 的吞吐量=]
在与您的 Spanner 实例位于同一网络区域的一个或多个 VM 中使用客户端应用程序 运行 的 multi-node 设置应该能够实现比这更高的容量。
import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
while (true) {
try (PreparedStatement ps =
connection.prepareStatement("INSERT INTO Test (Id, Col1, Col2) VALUES (?, ?, ?)")) {
for (int i = 0; i < 150; i++) {
ps.setLong(1, rnd.nextLong());
ps.setString(2, randomString(100));
ps.setString(3, randomString(100));
ps.addBatch();
rowCount.incrementAndGet();
}
ps.executeBatch();
}
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
您还可以尝试调整其他几项以获得更好的结果:
- 减少每批次的行数可以产生更好的整体结果。
- 如果可能,使用
InsertOrUpdate
变异对象比使用 DML 语句更有效(参见下面的示例)。
使用 Mutation
而不是 DML 的示例:
import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertOrUpdateMutationRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertOrUpdateMutationRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
CloudSpannerJdbcConnection csConnection =
connection.unwrap(CloudSpannerJdbcConnection.class);
while (true) {
ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
for (int i = 0; i < 150; i++) {
builder.add(
Mutation.newInsertOrUpdateBuilder("Test")
.set("Id")
.to(rnd.nextLong())
.set("Col1")
.to(randomString(100))
.set("Col2")
.to(randomString(100))
.build());
rowCount.incrementAndGet();
}
csConnection.write(builder.build());
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
上面的简单示例在没有任何进一步调整的情况下给出了大约 35,000 rows/second 的吞吐量。
其他信息 2020-08-21:变异对象比(批处理)DML 语句更有效的原因是 DML 语句在内部转换为由 Cloud 读取查询Spanner,然后用于创建突变。这种转换需要对批处理中的每个 DML 语句进行,这意味着具有 1,500 个简单插入语句的 DML 批处理将触发 1,500 个(小)读取查询,需要转换为 1,500 个突变。这很可能也是您在监控中看到读取延迟的原因。
您是否介意分享更多有关您的客户端应用程序的外观以及您有多少个实例的信息?运行?
要插入超过 8 亿行,并且看到您是 Java 程序员,我可以建议在 Dataflow 上使用 Beam 吗?
spanner writer in Beam 旨在使其写入尽可能高效 - 通过相似的键对行进行分组,并在您执行操作时对它们进行批处理。 Beam on Dataflow 还可以使用多个 worker VM 并行执行多个文件读取和 spanner 写入...
使用多区域 spanner 实例,您应该能够获得大约 1800 rows per node per second 的插入速度(如 Knut 的回复所建议的那样,如果行很小并且是批处理的,则插入速度更快)并且使用 5 个 spanner 节点,您可能有10 到 20 个导入程序线程 运行 并行 - 无论是使用您的导入程序还是使用 Dataflow。
(披露:我是 Beam SpannerIO 维护者)
背景:我正在尝试将 TSV 格式的数据文件(从 MySQL 数据库转储)加载到 GCP Spanner table。
- 客户端库:官方Spanner JDBC依赖v1.15.0
- table 架构:两个字符串类型的列和十个 int 类型的列
- GCP Spanner 实例:配置为具有 5 个节点的多区域 nam6
我的加载程序运行在GCP VM中,是访问Spanner实例的独占客户端。自动提交已启用。批量插入是我的程序执行的唯一 DML 操作,批量大小在 1500 左右。在每次提交中,它完全用完了突变限制,即 20000。同时,提交大小低于 5MB(值两个字符串类型的列是小型的)。根据主键的第一列对行进行分区,以便每次提交都可以发送到很少的分区以获得更好的性能。
通过上述所有配置和优化,插入率仅为每秒 1k 行左右。这真的让我很失望,因为我有超过 8 亿行要插入。我确实注意到 the official doc 提到了大约。多区域 Spanner 实例的峰值写入(QPS 总计)为 1800。
所以我这里有两个问题:
- 考虑到如此低的峰值写入 QPS,是否意味着 GCP 不期望或不支持客户将大型数据集迁移到多区域 Spanner 实例?
- 我看到 Spanner 监控的读取延迟很高。我没有任何阅读请求。我的猜测是,在写入行时,Spanner 需要先读取并检查是否存在具有相同主键的行。如果我的猜测是正确的,为什么要花这么多时间?如果没有,我能否获得有关这些读取操作如何发生的任何指导?
我不太清楚您是如何设置加载数据的客户端应用程序的。我的初步印象是您的客户端应用程序可能没有并行执行足够的事务。您通常应该能够插入远远超过 1,000 rows/second,但它需要您并行执行多个事务,可能来自多个 VM。我使用以下简单示例来测试从我的本地计算机到 单个 节点 Spanner 实例的负载吞吐量,这给了我大约 1,500 rows/second.[=14 的吞吐量=]
在与您的 Spanner 实例位于同一网络区域的一个或多个 VM 中使用客户端应用程序 运行 的 multi-node 设置应该能够实现比这更高的容量。
import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
while (true) {
try (PreparedStatement ps =
connection.prepareStatement("INSERT INTO Test (Id, Col1, Col2) VALUES (?, ?, ?)")) {
for (int i = 0; i < 150; i++) {
ps.setLong(1, rnd.nextLong());
ps.setString(2, randomString(100));
ps.setString(3, randomString(100));
ps.addBatch();
rowCount.incrementAndGet();
}
ps.executeBatch();
}
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
您还可以尝试调整其他几项以获得更好的结果:
- 减少每批次的行数可以产生更好的整体结果。
- 如果可能,使用
InsertOrUpdate
变异对象比使用 DML 语句更有效(参见下面的示例)。
使用 Mutation
而不是 DML 的示例:
import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class TestJdbc {
public static void main(String[] args) {
final int threads = 512;
ExecutorService executor = Executors.newFixedThreadPool(threads);
watch = Stopwatch.createStarted();
for (int i = 0; i < threads; i++) {
executor.submit(new InsertOrUpdateMutationRunnable());
}
}
static final AtomicLong rowCount = new AtomicLong();
static Stopwatch watch;
static final class InsertOrUpdateMutationRunnable implements Runnable {
@Override
public void run() {
try (Connection connection =
DriverManager.getConnection(
"jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) {
CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
CloudSpannerJdbcConnection csConnection =
connection.unwrap(CloudSpannerJdbcConnection.class);
while (true) {
ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
for (int i = 0; i < 150; i++) {
builder.add(
Mutation.newInsertOrUpdateBuilder("Test")
.set("Id")
.to(rnd.nextLong())
.set("Col1")
.to(randomString(100))
.set("Col2")
.to(randomString(100))
.build());
rowCount.incrementAndGet();
}
csConnection.write(builder.build());
System.out.println("Rows inserted: " + rowCount);
System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
}
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
private final Random rnd = new Random();
private String randomString(int maxLength) {
byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
rnd.nextBytes(bytes);
return Base64.encodeBase64String(bytes);
}
}
}
上面的简单示例在没有任何进一步调整的情况下给出了大约 35,000 rows/second 的吞吐量。
其他信息 2020-08-21:变异对象比(批处理)DML 语句更有效的原因是 DML 语句在内部转换为由 Cloud 读取查询Spanner,然后用于创建突变。这种转换需要对批处理中的每个 DML 语句进行,这意味着具有 1,500 个简单插入语句的 DML 批处理将触发 1,500 个(小)读取查询,需要转换为 1,500 个突变。这很可能也是您在监控中看到读取延迟的原因。
您是否介意分享更多有关您的客户端应用程序的外观以及您有多少个实例的信息?运行?
要插入超过 8 亿行,并且看到您是 Java 程序员,我可以建议在 Dataflow 上使用 Beam 吗?
spanner writer in Beam 旨在使其写入尽可能高效 - 通过相似的键对行进行分组,并在您执行操作时对它们进行批处理。 Beam on Dataflow 还可以使用多个 worker VM 并行执行多个文件读取和 spanner 写入...
使用多区域 spanner 实例,您应该能够获得大约 1800 rows per node per second 的插入速度(如 Knut 的回复所建议的那样,如果行很小并且是批处理的,则插入速度更快)并且使用 5 个 spanner 节点,您可能有10 到 20 个导入程序线程 运行 并行 - 无论是使用您的导入程序还是使用 Dataflow。
(披露:我是 Beam SpannerIO 维护者)