Hbase vs Google Bigtable:扫描大量行
Hbase vs Google Bigtable: scan for large number of rows
我正在尝试在 Bigtable 上使用开始行和结束行进行扫描。
扫描之间的元素大约为 100K。
我想分批获取它们,我可以使用 setCaching(500)
在 HBase 中做到这一点。
在 Bigtable 中,似乎 setCaching
被忽略了,它试图在 1 个 RPC 中获取整个结果集。怎么才能实现类似HBase呢?
我正在使用 Java 驱动程序 bigtable-hbase-1.1
和版本 1.0.0-pre3
Bigtable 配置:
Configuration conf = new Configuration();
conf.set("google.bigtable.buffered.mutator.throttling.enable", "false");
conf.set("google.bigtable.rpc.timeout.ms", "1500000");
conf.set("google.bigtable.grpc.read.partial.row.timeout.ms","1500000");
conf.set("google.bigtable.long.rpc.timeout.ms", "1500000");
conf.set("google.bigtable.grpc.retry.deadlineexceeded.enable", "false");
conf.set("google.bigtable.buffered.mutator.max.inflight.rpcs", "500");
conf.set("google.bigtable.bulk.max.row.key.count", "500");
Configuration conff = BigtableConfiguration.configure(conf,projectID,instanceID);
connection = BigtableConfiguration.connect(conff);
扫描仪配置:
byte[] start = "prefix".getbytes() ;
byte[] end = Bytes.add("prefix".getbytes(),(byte))0xff);
Scan scan = new Scan(start, end);
预计输出的行数约为 100Ks。
您不必担心读取行时的批处理问题。 Bigtable 响应被流式传输并且具有背压感知能力。我们也依靠 GRPC 来缓冲流的块。
这里有一个 link 对 GRPC 流的介绍:
https://grpc.io/docs/guides/concepts.html#server-streaming-rpc
介意试用此示例代码并告诉我它是否有效(即没有超出截止日期的错误)。如果示例代码有效,请修改它以扫描您自己的数据并确保它仍然有效。如果有什么不对,请告诉我。
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud.example</groupId>
<artifactId>row-write-read-example</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-1.x</artifactId>
<version>1.0.0-pre3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
java:
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import java.io.IOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
public class WriteReadTest {
private static final String PROJECT_ID = "<YOUR_PROJECT_ID>";
private static final String INSTANCE_ID = "<YOUR_INSTANCE_ID>";
private static final String TABLE_ID = "<YOUR_NONEXISTENT_TABLE>";
private static final String FAMILY = "cf";
private static final TableName TABLE_NAME = TableName.valueOf(TABLE_ID);
public static void main(String[] args) throws IOException {
try(Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID);
Admin admin = connection.getAdmin()) {
// Setup
admin.createTable(
new HTableDescriptor(TABLE_NAME)
.addFamily(new HColumnDescriptor(FAMILY))
);
try {
// Write the rows
populateTable(connection, 2_000_000);
// Read the rows
readFullTable(connection);
} finally {
admin.disableTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
}
}
}
private static void populateTable(Connection connection, int rowCount) throws IOException {
long startTime = System.currentTimeMillis();
int buckets = 100;
int maxWidth = Integer.toString(buckets).length();
try(BufferedMutator bufferedMutator = connection.getBufferedMutator(TABLE_NAME)) {
for (int i = 0; i < rowCount; i++) {
String prefix = String.format("%0" + maxWidth + "d", i % buckets);
String key = prefix + "-" + String.format("%010d", i);
String value = "value-" + key;
Put put = new Put(key.getBytes())
.addColumn(
FAMILY.getBytes(),
HConstants.EMPTY_BYTE_ARRAY,
value.getBytes()
);
bufferedMutator.mutate(put);
}
}
long endTime = System.currentTimeMillis();
System.out.printf("Populated table in %d secs, writing %d rows\n", (endTime - startTime) / 1000, rowCount);
}
private static void readFullTable(Connection connection) throws IOException {
long startTime = System.currentTimeMillis();
int count = 0;
try(Table table = connection.getTable(TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan("0".getBytes(), "z".getBytes()))) {
for(Result row = scanner.next(); row != null; row = scanner.next()) {
count++;
}
}
long endTime = System.currentTimeMillis();
System.out.printf("Scanned table in %d secs, reading %d rows\n", (endTime - startTime) / 1000, count);
}
}
我正在尝试在 Bigtable 上使用开始行和结束行进行扫描。
扫描之间的元素大约为 100K。
我想分批获取它们,我可以使用 setCaching(500)
在 HBase 中做到这一点。
在 Bigtable 中,似乎 setCaching
被忽略了,它试图在 1 个 RPC 中获取整个结果集。怎么才能实现类似HBase呢?
我正在使用 Java 驱动程序 bigtable-hbase-1.1
和版本 1.0.0-pre3
Bigtable 配置:
Configuration conf = new Configuration();
conf.set("google.bigtable.buffered.mutator.throttling.enable", "false");
conf.set("google.bigtable.rpc.timeout.ms", "1500000");
conf.set("google.bigtable.grpc.read.partial.row.timeout.ms","1500000");
conf.set("google.bigtable.long.rpc.timeout.ms", "1500000");
conf.set("google.bigtable.grpc.retry.deadlineexceeded.enable", "false");
conf.set("google.bigtable.buffered.mutator.max.inflight.rpcs", "500");
conf.set("google.bigtable.bulk.max.row.key.count", "500");
Configuration conff = BigtableConfiguration.configure(conf,projectID,instanceID);
connection = BigtableConfiguration.connect(conff);
扫描仪配置:
byte[] start = "prefix".getbytes() ;
byte[] end = Bytes.add("prefix".getbytes(),(byte))0xff);
Scan scan = new Scan(start, end);
预计输出的行数约为 100Ks。
您不必担心读取行时的批处理问题。 Bigtable 响应被流式传输并且具有背压感知能力。我们也依靠 GRPC 来缓冲流的块。 这里有一个 link 对 GRPC 流的介绍: https://grpc.io/docs/guides/concepts.html#server-streaming-rpc
介意试用此示例代码并告诉我它是否有效(即没有超出截止日期的错误)。如果示例代码有效,请修改它以扫描您自己的数据并确保它仍然有效。如果有什么不对,请告诉我。
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud.example</groupId>
<artifactId>row-write-read-example</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-1.x</artifactId>
<version>1.0.0-pre3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
java:
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import java.io.IOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
public class WriteReadTest {
private static final String PROJECT_ID = "<YOUR_PROJECT_ID>";
private static final String INSTANCE_ID = "<YOUR_INSTANCE_ID>";
private static final String TABLE_ID = "<YOUR_NONEXISTENT_TABLE>";
private static final String FAMILY = "cf";
private static final TableName TABLE_NAME = TableName.valueOf(TABLE_ID);
public static void main(String[] args) throws IOException {
try(Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID);
Admin admin = connection.getAdmin()) {
// Setup
admin.createTable(
new HTableDescriptor(TABLE_NAME)
.addFamily(new HColumnDescriptor(FAMILY))
);
try {
// Write the rows
populateTable(connection, 2_000_000);
// Read the rows
readFullTable(connection);
} finally {
admin.disableTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
}
}
}
private static void populateTable(Connection connection, int rowCount) throws IOException {
long startTime = System.currentTimeMillis();
int buckets = 100;
int maxWidth = Integer.toString(buckets).length();
try(BufferedMutator bufferedMutator = connection.getBufferedMutator(TABLE_NAME)) {
for (int i = 0; i < rowCount; i++) {
String prefix = String.format("%0" + maxWidth + "d", i % buckets);
String key = prefix + "-" + String.format("%010d", i);
String value = "value-" + key;
Put put = new Put(key.getBytes())
.addColumn(
FAMILY.getBytes(),
HConstants.EMPTY_BYTE_ARRAY,
value.getBytes()
);
bufferedMutator.mutate(put);
}
}
long endTime = System.currentTimeMillis();
System.out.printf("Populated table in %d secs, writing %d rows\n", (endTime - startTime) / 1000, rowCount);
}
private static void readFullTable(Connection connection) throws IOException {
long startTime = System.currentTimeMillis();
int count = 0;
try(Table table = connection.getTable(TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan("0".getBytes(), "z".getBytes()))) {
for(Result row = scanner.next(); row != null; row = scanner.next()) {
count++;
}
}
long endTime = System.currentTimeMillis();
System.out.printf("Scanned table in %d secs, reading %d rows\n", (endTime - startTime) / 1000, count);
}
}