HBase 协处理器:checkAndPut 导致 <Timed out on getting lock for row>
HBase co-processor: checkAndPut causing <Timed out on getting lock for row>
HBase版本:0.94.15-cdh4.7.0
我有一个非常简单的设置:
- table ttt 数据
- table计数器带计数器(增量字段)
- prePut ttt table
的处理器
当 inserted/updated 在 ttt 中时,协处理器检查列 d:k[= 中是否存在值59=] 对于同一行。
如果没有值,协处理器递增 counters table 中的计数器,并通过 [=33] 将其分配给 d:k 列=]checkAndPut 方法。
代码如下:
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,
final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException {
HTable tableCounters = null;
HTable tableTarget = null;
try {
Get existingEdwGet = new Get(put.getRow());
existingEdwGet.addColumn("d".getBytes(), "k".getBytes());
tableTarget = new HTable(
this.configuration,
observerContext.getEnvironment().getRegion().getTableDesc().getName());
if (!tableTarget.exists(existingEdwGet)) {
// increment the counter
tableCounters = new HTable(this.configuration, "counters");
long newEdwKey = tableCounters.incrementColumnValue("static_row".getBytes(), "counters".getBytes(), "k".getBytes(), 1);
Put keySetter = new Put(put.getRow());
keySetter.add("d".getBytes(), "k".getBytes(), Bytes.toBytes(newEdwKey));
tableTarget.checkAndPut(put.getRow(), "d".getBytes(), "k".getBytes(), null, keySetter);
}
} finally {
releaseCloseable(tableTarget);
releaseCloseable(tableCounters);
}
}
功利主义functions/variables:
- releaseClosable - 简单的
.close()
和 try/catch
- this.configuration - 在协处理器启动期间获取的 Hadoop 配置
从 hbase shell
执行简单 PUT 时:
for i in 0..10 do
put 'ttt', "hrow-#{i}" , 'd:column', 'value'
end
区域报告死锁:
2015-07-02 23:58:30,297 ERROR org.apache.hadoop.hbase.regionserver.HRegionServer (IPC Server handler 43 on 60020):
java.io.IOException: Timed out on getting lock for row=hrow-1
at org.apache.hadoop.hbase.regionserver.HRegion.internalObtainRowLock(HRegion.java:3588)
at org.apache.hadoop.hbase.regionserver.HRegion.getLock(HRegion.java:3678)
at org.apache.hadoop.hbase.regionserver.HRegion.getLock(HRegion.java:3662)
at org.apache.hadoop.hbase.regionserver.HRegion.checkAndMutate(HRegion.java:2723)
at org.apache.hadoop.hbase.regionserver.HRegionServer.checkAndMutate(HRegionServer.java:2307)
at org.apache.hadoop.hbase.regionserver.HRegionServer.checkAndPut(HRegionServer.java:2345)
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:354)
at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1434)
问题:
- 是否允许从 prePut 协处理器执行 checkAndPut?
- 还有什么可以保证在并发环境中,多个并发工作人员可以写入相同的 ttt 行,d:k 值只赋值一次?
实际问题是无限循环,由 prePut 协处理器调用 .put 或 .checkAndPut 依次调用 prePut 协处理器。
为了打破循环,我实施了以下方法:
- 向正在创建的 put 添加一个 marker
- 在协处理器的顶部 - 检查 标记 是否存在。如果 yes 则删除 marker 并跳过协处理器。如果 no 则这是一个新请求,不是由该协处理器先前发起的;因此 - 继续流程
.
public static final byte[] DIM_FAMILY = "d".getBytes();
public static final byte[] COLUMN_KEY = "k".getBytes();
public static final byte[] COLUMN_MARKER = "marker".getBytes();
public static final byte[] VALUE_MARKER = "+".getBytes();
public static final TableName TABLE_COUNTERS = TableName.valueOf("counters");
public static final byte[] COUNTER_FAMILY = "c".getBytes();
public static final byte[] COUNTER_ROWKEY = "rowkey_counter".getBytes();
public static final byte[] COUNTER_KEY = "key_counter".getBytes();
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,
final Put put, final WALEdit edit, final Durability durability) throws IOException {
if (put.has(DIM_FAMILY, COLUMN_MARKER)) {
removeColumnMutations(put, COLUMN_MARKER);
return; // return from the coprocessor; otherwise an infinite loop will occur
}
HRegion region = observerContext.getEnvironment().getRegion();
Table tableCounters = null;
Connection connectionCounters = null;
try {
// check whether the key column for the row is empty
Get existingEdwGet = new Get(put.getRow());
existingEdwGet.addColumn(DIM_FAMILY, COLUMN_KEY);
List<Cell> existingEdwCells = region.get(existingEdwGet, false);
// check if key value is empty.
// if so - assign one immediately
if (existingEdwCells.isEmpty()) {
// increment the key_counter
connectionCounters = ConnectionFactory.createConnection(configuration);
tableCounters = connectionCounters.getTable(TABLE_COUNTERS);
long newEdwKey = tableCounters.incrementColumnValue(COUNTER_ROWKEY, COUNTER_FAMILY, COUNTER_KEY, 1);
// form PUT with the new key value and a marker, showing that this insert should not be discarded
Put keySetter = new Put(put.getRow());
keySetter.addColumn(DIM_FAMILY, COLUMN_KEY, Bytes.toBytes(newEdwKey));
keySetter.addColumn(DIM_FAMILY, COLUMN_MARKER, VALUE_MARKER);
// consider checkAndPut return value, and increment Sequence Hole Number if needed
boolean isNew = region.checkAndMutate(keySetter.getRow(), DIM_FAMILY, COLUMN_KEY,
CompareFilter.CompareOp.EQUAL, new BinaryComparator(null), keySetter, true);
}
} finally {
releaseCloseable(tableCounters);
releaseCloseable(connectionCounters);
}
}
备注:
- 以上协处理器适配HBase 1.0 SDK
- 不是打开与底层区域的连接,而是使用来自 RegionCoprocessorEnvironment context
的 HBase Region 实例
- 实用方法 removeColumnMutations 可以省略,它的唯一目的是从 PUT[=40 中删除 marker =]
HBase版本:0.94.15-cdh4.7.0
我有一个非常简单的设置:
- table ttt 数据
- table计数器带计数器(增量字段)
- prePut ttt table 的处理器
当 inserted/updated 在 ttt 中时,协处理器检查列 d:k[= 中是否存在值59=] 对于同一行。
如果没有值,协处理器递增 counters table 中的计数器,并通过 [=33] 将其分配给 d:k 列=]checkAndPut 方法。
代码如下:
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,
final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException {
HTable tableCounters = null;
HTable tableTarget = null;
try {
Get existingEdwGet = new Get(put.getRow());
existingEdwGet.addColumn("d".getBytes(), "k".getBytes());
tableTarget = new HTable(
this.configuration,
observerContext.getEnvironment().getRegion().getTableDesc().getName());
if (!tableTarget.exists(existingEdwGet)) {
// increment the counter
tableCounters = new HTable(this.configuration, "counters");
long newEdwKey = tableCounters.incrementColumnValue("static_row".getBytes(), "counters".getBytes(), "k".getBytes(), 1);
Put keySetter = new Put(put.getRow());
keySetter.add("d".getBytes(), "k".getBytes(), Bytes.toBytes(newEdwKey));
tableTarget.checkAndPut(put.getRow(), "d".getBytes(), "k".getBytes(), null, keySetter);
}
} finally {
releaseCloseable(tableTarget);
releaseCloseable(tableCounters);
}
}
功利主义functions/variables:
- releaseClosable - 简单的
.close()
和try/catch
- this.configuration - 在协处理器启动期间获取的 Hadoop 配置
从 hbase shell
执行简单 PUT 时:
for i in 0..10 do
put 'ttt', "hrow-#{i}" , 'd:column', 'value'
end
区域报告死锁:
2015-07-02 23:58:30,297 ERROR org.apache.hadoop.hbase.regionserver.HRegionServer (IPC Server handler 43 on 60020):
java.io.IOException: Timed out on getting lock for row=hrow-1
at org.apache.hadoop.hbase.regionserver.HRegion.internalObtainRowLock(HRegion.java:3588)
at org.apache.hadoop.hbase.regionserver.HRegion.getLock(HRegion.java:3678)
at org.apache.hadoop.hbase.regionserver.HRegion.getLock(HRegion.java:3662)
at org.apache.hadoop.hbase.regionserver.HRegion.checkAndMutate(HRegion.java:2723)
at org.apache.hadoop.hbase.regionserver.HRegionServer.checkAndMutate(HRegionServer.java:2307)
at org.apache.hadoop.hbase.regionserver.HRegionServer.checkAndPut(HRegionServer.java:2345)
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:354)
at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1434)
问题:
- 是否允许从 prePut 协处理器执行 checkAndPut?
- 还有什么可以保证在并发环境中,多个并发工作人员可以写入相同的 ttt 行,d:k 值只赋值一次?
实际问题是无限循环,由 prePut 协处理器调用 .put 或 .checkAndPut 依次调用 prePut 协处理器。
为了打破循环,我实施了以下方法:
- 向正在创建的 put 添加一个 marker
- 在协处理器的顶部 - 检查 标记 是否存在。如果 yes 则删除 marker 并跳过协处理器。如果 no 则这是一个新请求,不是由该协处理器先前发起的;因此 - 继续流程
.
public static final byte[] DIM_FAMILY = "d".getBytes();
public static final byte[] COLUMN_KEY = "k".getBytes();
public static final byte[] COLUMN_MARKER = "marker".getBytes();
public static final byte[] VALUE_MARKER = "+".getBytes();
public static final TableName TABLE_COUNTERS = TableName.valueOf("counters");
public static final byte[] COUNTER_FAMILY = "c".getBytes();
public static final byte[] COUNTER_ROWKEY = "rowkey_counter".getBytes();
public static final byte[] COUNTER_KEY = "key_counter".getBytes();
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,
final Put put, final WALEdit edit, final Durability durability) throws IOException {
if (put.has(DIM_FAMILY, COLUMN_MARKER)) {
removeColumnMutations(put, COLUMN_MARKER);
return; // return from the coprocessor; otherwise an infinite loop will occur
}
HRegion region = observerContext.getEnvironment().getRegion();
Table tableCounters = null;
Connection connectionCounters = null;
try {
// check whether the key column for the row is empty
Get existingEdwGet = new Get(put.getRow());
existingEdwGet.addColumn(DIM_FAMILY, COLUMN_KEY);
List<Cell> existingEdwCells = region.get(existingEdwGet, false);
// check if key value is empty.
// if so - assign one immediately
if (existingEdwCells.isEmpty()) {
// increment the key_counter
connectionCounters = ConnectionFactory.createConnection(configuration);
tableCounters = connectionCounters.getTable(TABLE_COUNTERS);
long newEdwKey = tableCounters.incrementColumnValue(COUNTER_ROWKEY, COUNTER_FAMILY, COUNTER_KEY, 1);
// form PUT with the new key value and a marker, showing that this insert should not be discarded
Put keySetter = new Put(put.getRow());
keySetter.addColumn(DIM_FAMILY, COLUMN_KEY, Bytes.toBytes(newEdwKey));
keySetter.addColumn(DIM_FAMILY, COLUMN_MARKER, VALUE_MARKER);
// consider checkAndPut return value, and increment Sequence Hole Number if needed
boolean isNew = region.checkAndMutate(keySetter.getRow(), DIM_FAMILY, COLUMN_KEY,
CompareFilter.CompareOp.EQUAL, new BinaryComparator(null), keySetter, true);
}
} finally {
releaseCloseable(tableCounters);
releaseCloseable(connectionCounters);
}
}
备注:
- 以上协处理器适配HBase 1.0 SDK
- 不是打开与底层区域的连接,而是使用来自 RegionCoprocessorEnvironment context 的 HBase Region 实例
- 实用方法 removeColumnMutations 可以省略,它的唯一目的是从 PUT[=40 中删除 marker =]