Hbase Table.batch 需要 300 秒才能将 800,000 个条目插入 table
Hbase Table.batch takes 300 secs to insert 800,000 entries into table
我正在读取一个大小为 30 MB 的 json 文件,正在处理创建列族和键值的过程。然后创建 Put 对象,将 rowkey 和 values 插入其中。创建此类放置对象的列表并调用 Table.batch() 并传递此列表。当我的 arraylist 大小为 50000 时,我正在调用它。然后清除列表并调用下一批。然而,处理最终有 800,000 个条目的文件需要 300 秒。我也很累 table.put 但它更慢。我正在使用 hbase 1.1。我从 Kafka 得到 json。任何提高性能的建议都值得赞赏。我检查了 SO 论坛,但帮助不大。如果你想看一下,我会分享代码。
此致
拉哈文德拉
public static void processData(String jsonData)
{
if (jsonData == null || jsonData.isEmpty())
{
System.out.println("JSON data is null or empty. Nothing to process");
return;
}
long startTime = System.currentTimeMillis();
Table table = null;
try
{
table = HBaseConfigUtil.getInstance().getConnection().getTable(TableName.valueOf("MYTABLE"));
}
catch (IOException e1)
{
System.out.println(e1);
}
Put processData = null;
List<Put> bulkData = new ArrayList<Put>();
try
{
//Read the json and generate the model into a class
//ProcessExecutions is List<ProcessExecution>
ProcessExecutions peData = JsonToColumnData.gson.fromJson(jsonData, ProcessExecutions.class);
if (peData != null)
{
//Read the data and pass it to Hbase
for (ProcessExecution pe : peData.processExecutions)
{
//Class Header stores some header information
Header headerData = pe.getHeader();
String rowKey = headerData.getRowKey();
processData = new Put(Bytes.toBytes(JsonToColumnData.rowKey));
processData.addColumn(Bytes.toBytes("Data"),
Bytes.toBytes("Time"),
Bytes.toBytes("value"));
//Add to list
bulkData.add(processData);
if (bulkData.size() >= 50000) //hardcoded for demo
{
long tmpTime = System.currentTimeMillis();
Object[] results = null;
table.batch(bulkData, results);
bulkData.clear();
System.gc();
}
} //end for
//Complete the remaining write operation
if (bulkData.size() > 0)
{
Object[] results = null;
table.batch(bulkData, results);
bulkData.clear();
//Try to free memory
System.gc();
}
}
catch (Exception e)
{
System.out.println(e);
e.printStackTrace();
}
finally
{
try
{
table.close();
}
catch (IOException e)
{
System.out.println("Error closing table " + e);
e.printStackTrace();
}
}
}
//This function is added here to show the connection
/*public Connection getConnection()
{
try
{
if (this.connection == null)
{
ExecutorService executor = Executors.newFixedThreadPool(HBaseConfigUtil.THREADCOUNT);
this.connection = ConnectionFactory.createConnection(this.getHBaseConfiguration(), executor);
}
}
catch (IOException e)
{
e.printStackTrace();
System.out.println("Error in getting connection " + e.getMessage());
}
return this.connection;
}*/
我有同样的情况,我需要解析 5 GB json 并插入到 hbase table ...你可以尝试下面的方法(应该可行),事实证明速度非常快在我的案例中,批量为 100000 条记录。
public void addMultipleRecordsAtaShot(final ArrayList<Put> puts, final String tableName) throws Exception {
try {
final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
table.put(puts);
LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
} catch (final Throwable e) {
e.printStackTrace();
} finally {
LOG.info("Processed ---> " + puts.size());
if (puts != null) {
puts.clear();
}
}
}
有关增加缓冲区大小的更多详细信息,请查看我在其他 to increase buffer size please refer doc https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Table.html
中的回答
我正在读取一个大小为 30 MB 的 json 文件,正在处理创建列族和键值的过程。然后创建 Put 对象,将 rowkey 和 values 插入其中。创建此类放置对象的列表并调用 Table.batch() 并传递此列表。当我的 arraylist 大小为 50000 时,我正在调用它。然后清除列表并调用下一批。然而,处理最终有 800,000 个条目的文件需要 300 秒。我也很累 table.put 但它更慢。我正在使用 hbase 1.1。我从 Kafka 得到 json。任何提高性能的建议都值得赞赏。我检查了 SO 论坛,但帮助不大。如果你想看一下,我会分享代码。
此致
拉哈文德拉
public static void processData(String jsonData)
{
if (jsonData == null || jsonData.isEmpty())
{
System.out.println("JSON data is null or empty. Nothing to process");
return;
}
long startTime = System.currentTimeMillis();
Table table = null;
try
{
table = HBaseConfigUtil.getInstance().getConnection().getTable(TableName.valueOf("MYTABLE"));
}
catch (IOException e1)
{
System.out.println(e1);
}
Put processData = null;
List<Put> bulkData = new ArrayList<Put>();
try
{
//Read the json and generate the model into a class
//ProcessExecutions is List<ProcessExecution>
ProcessExecutions peData = JsonToColumnData.gson.fromJson(jsonData, ProcessExecutions.class);
if (peData != null)
{
//Read the data and pass it to Hbase
for (ProcessExecution pe : peData.processExecutions)
{
//Class Header stores some header information
Header headerData = pe.getHeader();
String rowKey = headerData.getRowKey();
processData = new Put(Bytes.toBytes(JsonToColumnData.rowKey));
processData.addColumn(Bytes.toBytes("Data"),
Bytes.toBytes("Time"),
Bytes.toBytes("value"));
//Add to list
bulkData.add(processData);
if (bulkData.size() >= 50000) //hardcoded for demo
{
long tmpTime = System.currentTimeMillis();
Object[] results = null;
table.batch(bulkData, results);
bulkData.clear();
System.gc();
}
} //end for
//Complete the remaining write operation
if (bulkData.size() > 0)
{
Object[] results = null;
table.batch(bulkData, results);
bulkData.clear();
//Try to free memory
System.gc();
}
}
catch (Exception e)
{
System.out.println(e);
e.printStackTrace();
}
finally
{
try
{
table.close();
}
catch (IOException e)
{
System.out.println("Error closing table " + e);
e.printStackTrace();
}
}
}
//This function is added here to show the connection
/*public Connection getConnection()
{
try
{
if (this.connection == null)
{
ExecutorService executor = Executors.newFixedThreadPool(HBaseConfigUtil.THREADCOUNT);
this.connection = ConnectionFactory.createConnection(this.getHBaseConfiguration(), executor);
}
}
catch (IOException e)
{
e.printStackTrace();
System.out.println("Error in getting connection " + e.getMessage());
}
return this.connection;
}*/
我有同样的情况,我需要解析 5 GB json 并插入到 hbase table ...你可以尝试下面的方法(应该可行),事实证明速度非常快在我的案例中,批量为 100000 条记录。
public void addMultipleRecordsAtaShot(final ArrayList<Put> puts, final String tableName) throws Exception {
try {
final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
table.put(puts);
LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
} catch (final Throwable e) {
e.printStackTrace();
} finally {
LOG.info("Processed ---> " + puts.size());
if (puts != null) {
puts.clear();
}
}
}
有关增加缓冲区大小的更多详细信息,请查看我在其他