通过 Java 代码将多个项目放入 DynamoDB
Put multiple items into DynamoDB by Java code
我想使用 SDK Amazon 的 batchWriteItem 方法将很多项目放入 table。
我从 Kinesis 取回了物品,发现它有很多碎片。
我对一项使用了这种方法:
public static void addSingleRecord(Item thingRecord) {
// Add an item
try
{
DynamoDB dynamo = new DynamoDB(dynamoDB);
Table table = dynamo.getTable(dataTable);
table.putItem(thingRecord);
} catch (AmazonServiceException ase) {
System.out.println("addThingsData request "
+ "to AWS was rejected with an error response for some reason.");
System.out.println("Error Message: " + ase.getMessage());
System.out.println("HTTP Status Code: " + ase.getStatusCode());
System.out.println("AWS Error Code: " + ase.getErrorCode());
System.out.println("Error Type: " + ase.getErrorType());
System.out.println("Request ID: " + ase.getRequestId());
} catch (AmazonClientException ace) {
System.out.println("addThingsData - Caught an AmazonClientException, which means the client encountered "
+ "a serious internal problem while trying to communicate with AWS, "
+ "such as not being able to access the network.");
System.out.println("Error Message: " + ace.getMessage());
}
}
public static void addThings(String thingDatum) {
Item itemJ2;
itemJ2 = Item.fromJSON(thingDatum);
addSingleRecord(itemJ2);
}
项目传递自:
private void processSingleRecord(Record record) {
// TODO Add your own record processing logic here
String data = null;
try {
// For this app, we interpret the payload as UTF-8 chars.
data = decoder.decode(record.getData()).toString();
System.out.println("**processSingleRecord - data " + data);
AmazonDynamoDBSample.addThings(data);
} catch (NumberFormatException e) {
LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
} catch (CharacterCodingException e) {
LOG.error("Malformed data: " + data, e);
}
}
现在如果我想放很多记录,我会使用:
public static void writeMultipleItemsBatchWrite(Item thingRecord) {
try {
dataTableWriteItems.addItemToPut(thingRecord);
System.out.println("Making the request.");
BatchWriteItemOutcome outcome = dynamo.batchWriteItem(dataTableWriteItems);
do {
// Check for unprocessed keys which could happen if you exceed provisioned throughput
Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems();
if (outcome.getUnprocessedItems().size() == 0) {
System.out.println("No unprocessed items found");
} else {
System.out.println("Retrieving the unprocessed items");
outcome = dynamo.batchWriteItemUnprocessed(unprocessedItems);
}
} while (outcome.getUnprocessedItems().size() > 0);
} catch (Exception e) {
System.err.println("Failed to retrieve items: ");
e.printStackTrace(System.err);
}
}
但是最后一组怎么发送呢?因为我只在我有 25 件商品时才发送,但最后数量较少。
您可以使用 Document SDK in a Lambda function attached to your Kinesis Stream 使用 PutItem 或 UpdateItem 一次将一个项目写入 DynamoDB table。这样,您可以在流记录出现在流中时对其做出反应,而不必担心是否还有更多记录需要处理。在幕后,BatchWriteItem 消耗的写入容量单位与相应的 PutItem 调用相同。 BatchWriteItem 将与批处理中花费时间最长的 PUT 一样潜伏。因此,使用 BatchWriteItem,您可能会遇到比并行 PutItem/UpdateItem 调用更高的平均延迟。
我想使用 SDK Amazon 的 batchWriteItem 方法将很多项目放入 table。 我从 Kinesis 取回了物品,发现它有很多碎片。 我对一项使用了这种方法:
public static void addSingleRecord(Item thingRecord) {
// Add an item
try
{
DynamoDB dynamo = new DynamoDB(dynamoDB);
Table table = dynamo.getTable(dataTable);
table.putItem(thingRecord);
} catch (AmazonServiceException ase) {
System.out.println("addThingsData request "
+ "to AWS was rejected with an error response for some reason.");
System.out.println("Error Message: " + ase.getMessage());
System.out.println("HTTP Status Code: " + ase.getStatusCode());
System.out.println("AWS Error Code: " + ase.getErrorCode());
System.out.println("Error Type: " + ase.getErrorType());
System.out.println("Request ID: " + ase.getRequestId());
} catch (AmazonClientException ace) {
System.out.println("addThingsData - Caught an AmazonClientException, which means the client encountered "
+ "a serious internal problem while trying to communicate with AWS, "
+ "such as not being able to access the network.");
System.out.println("Error Message: " + ace.getMessage());
}
}
public static void addThings(String thingDatum) {
Item itemJ2;
itemJ2 = Item.fromJSON(thingDatum);
addSingleRecord(itemJ2);
}
项目传递自:
private void processSingleRecord(Record record) {
// TODO Add your own record processing logic here
String data = null;
try {
// For this app, we interpret the payload as UTF-8 chars.
data = decoder.decode(record.getData()).toString();
System.out.println("**processSingleRecord - data " + data);
AmazonDynamoDBSample.addThings(data);
} catch (NumberFormatException e) {
LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
} catch (CharacterCodingException e) {
LOG.error("Malformed data: " + data, e);
}
}
现在如果我想放很多记录,我会使用:
public static void writeMultipleItemsBatchWrite(Item thingRecord) {
try {
dataTableWriteItems.addItemToPut(thingRecord);
System.out.println("Making the request.");
BatchWriteItemOutcome outcome = dynamo.batchWriteItem(dataTableWriteItems);
do {
// Check for unprocessed keys which could happen if you exceed provisioned throughput
Map<String, List<WriteRequest>> unprocessedItems = outcome.getUnprocessedItems();
if (outcome.getUnprocessedItems().size() == 0) {
System.out.println("No unprocessed items found");
} else {
System.out.println("Retrieving the unprocessed items");
outcome = dynamo.batchWriteItemUnprocessed(unprocessedItems);
}
} while (outcome.getUnprocessedItems().size() > 0);
} catch (Exception e) {
System.err.println("Failed to retrieve items: ");
e.printStackTrace(System.err);
}
}
但是最后一组怎么发送呢?因为我只在我有 25 件商品时才发送,但最后数量较少。
您可以使用 Document SDK in a Lambda function attached to your Kinesis Stream 使用 PutItem 或 UpdateItem 一次将一个项目写入 DynamoDB table。这样,您可以在流记录出现在流中时对其做出反应,而不必担心是否还有更多记录需要处理。在幕后,BatchWriteItem 消耗的写入容量单位与相应的 PutItem 调用相同。 BatchWriteItem 将与批处理中花费时间最长的 PUT 一样潜伏。因此,使用 BatchWriteItem,您可能会遇到比并行 PutItem/UpdateItem 调用更高的平均延迟。