scala/spark 如何调用 batchWriteItemAsync 加载 DynamoDB
How to call batchWriteItemAsync to load DynamoDB in scala/spark
我正在尝试运行batchwriteAsync通过Scala(Spark)将数据加载到DynamoDB中,但是在运行时遇到以下错误,请问如何在Scala中正确处理batchwriteAsync?
代码
import com.amazonaws.services.dynamodbv2.model._
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDBAsyncClientBuilder, AmazonDynamoDBAsync, AmazonDynamoDBClientBuilder, AmazonDynamoDB}
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext,Future}
val POOL_SIZE = 3
val client: AmazonDynamoDBAsync = AmazonDynamoDBAsyncClientBuilder.standard().withRegion(REGION).build()
// init threads pool
val jobExecutorPool = Executors.newFixedThreadPool(POOL_SIZE)
// create the implicit ExecutionContext based on our thread pool
implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(jobExecutorPool)
var contentSeq = ...
val batchWriteItems: java.util.List[WriteRequest] = contentSeq.asJava
def batchWriteFuture(tableName: String, batchWriteItems: java.util.List[WriteRequest])(implicit xc: ExecutionContext): Future[BatchWriteItemResult] = {
client.batchWriteItemAsync(
(new BatchWriteItemRequest()).withRequestItems(Map(tableName -> batchWriteItems).asJava)
)
}
batchWriteFuture(tableName, batchWriteItems)
错误:
error: type mismatch;
found : java.util.concurrent.Future[com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult]
required: scala.concurrent.Future[com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult]
client.batchWriteItemAsync(
client.batchWriteItemAsync
是 Java API 所以 return 是 java.util.concurrent.Future
您的方法 batchWriteFuture
具有 return 类型 scala.concurrent.Future
尝试将您的 DynamoDB 调用转换为 scala,例如
def batchWriteFuture(tableName: String, batchWriteItems: java.util.List[WriteRequest])(implicit xc: ExecutionContext): Future[BatchWriteItemResult] = {
client.batchWriteItemAsync(
(new BatchWriteItemRequest()).withRequestItems(Map(tableName -> batchWriteItems).asJava)
).asScala // <---------- converting to scala future
}
我正在尝试运行batchwriteAsync通过Scala(Spark)将数据加载到DynamoDB中,但是在运行时遇到以下错误,请问如何在Scala中正确处理batchwriteAsync?
代码
import com.amazonaws.services.dynamodbv2.model._
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDBAsyncClientBuilder, AmazonDynamoDBAsync, AmazonDynamoDBClientBuilder, AmazonDynamoDB}
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext,Future}
val POOL_SIZE = 3
val client: AmazonDynamoDBAsync = AmazonDynamoDBAsyncClientBuilder.standard().withRegion(REGION).build()
// init threads pool
val jobExecutorPool = Executors.newFixedThreadPool(POOL_SIZE)
// create the implicit ExecutionContext based on our thread pool
implicit val xc: ExecutionContext = ExecutionContext.fromExecutorService(jobExecutorPool)
var contentSeq = ...
val batchWriteItems: java.util.List[WriteRequest] = contentSeq.asJava
def batchWriteFuture(tableName: String, batchWriteItems: java.util.List[WriteRequest])(implicit xc: ExecutionContext): Future[BatchWriteItemResult] = {
client.batchWriteItemAsync(
(new BatchWriteItemRequest()).withRequestItems(Map(tableName -> batchWriteItems).asJava)
)
}
batchWriteFuture(tableName, batchWriteItems)
错误:
error: type mismatch;
found : java.util.concurrent.Future[com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult]
required: scala.concurrent.Future[com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult]
client.batchWriteItemAsync(
client.batchWriteItemAsync
是 Java API 所以 return 是 java.util.concurrent.Future
您的方法 batchWriteFuture
具有 return 类型 scala.concurrent.Future
尝试将您的 DynamoDB 调用转换为 scala,例如
def batchWriteFuture(tableName: String, batchWriteItems: java.util.List[WriteRequest])(implicit xc: ExecutionContext): Future[BatchWriteItemResult] = {
client.batchWriteItemAsync(
(new BatchWriteItemRequest()).withRequestItems(Map(tableName -> batchWriteItems).asJava)
).asScala // <---------- converting to scala future
}