Spark Streaming Kinesis Integration:在 Worker 中初始化 LeaseCoordinator 时出错
Spark Streaming Kinesis Integration: Error while initializing LeaseCoordinator in Worker
当 运行 在 scala 中使用 kinesis 应用程序进行简单的 vanilla spark 流式传输时,我遇到了一些问题。我遵循了一些教程中的基本指导,如 Snowplow and WordCountASL。
但由于这个 Kinesis Worker 错误,我仍然无法让它工作:
16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
... 4 more
这是我的代码示例:
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/**
* Created by franco on 11/11/16.
*/
object TestApp {
// === Configurations for Kinesis streams ===
val awsAccessKeyId = "XXXXXX"
val awsSecretKey = "XXXXXXX"
val kinesisStreamName = "MyStream"
val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
val appName = "MyAppName"
def main(args: Array[String]): Unit = {
val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)
val provider = new StaticCredentialsProvider(credentials)
val kinesisClient = new AmazonKinesisClient(provider)
kinesisClient.setEndpoint(kinesisEndpointUrl)
val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()
val streams = shards
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName
val cores : Int = Runtime.getRuntime.availableProcessors()
println("Available Cores : " + cores.toString)
val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
val ssc = new StreamingContext(config, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until streams).map { i =>
KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
}
ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
我的 IAM 策略如下所示:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt123",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:region:account:stream/name"
]
},
{
"Sid": "Stmt456",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DeleteItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Scan",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:region:account:table/name"
]
},
{
"Sid": "Stmt789",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": [
"*"
]
}
]
}
我不明白这个应用程序有什么问题。任何有关此主题的指导将不胜感激。
最终我通过将凭据值设置到系统属性中使其工作。
System.setProperty("aws.accessKeyId","XXXXXX")
System.setProperty("aws.secretKey","XXXXXX")
但我对这个解决方案还不是很"happy"。
您认为这种方法有什么问题吗?
还有其他 DStreams 构造函数允许您传入 AWS 访问密钥和秘密密钥。
例如下面 link 中的第一个和第五个构造函数将允许您在构造函数中传递它们(并且它应该通过您的系统传递)而不是必须设置一个系统 属性 .
当 运行 在 scala 中使用 kinesis 应用程序进行简单的 vanilla spark 流式传输时,我遇到了一些问题。我遵循了一些教程中的基本指导,如 Snowplow and WordCountASL。
但由于这个 Kinesis Worker 错误,我仍然无法让它工作:
16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
... 4 more
这是我的代码示例:
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/**
* Created by franco on 11/11/16.
*/
object TestApp {
// === Configurations for Kinesis streams ===
val awsAccessKeyId = "XXXXXX"
val awsSecretKey = "XXXXXXX"
val kinesisStreamName = "MyStream"
val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
val appName = "MyAppName"
def main(args: Array[String]): Unit = {
val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)
val provider = new StaticCredentialsProvider(credentials)
val kinesisClient = new AmazonKinesisClient(provider)
kinesisClient.setEndpoint(kinesisEndpointUrl)
val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()
val streams = shards
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName
val cores : Int = Runtime.getRuntime.availableProcessors()
println("Available Cores : " + cores.toString)
val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
val ssc = new StreamingContext(config, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until streams).map { i =>
KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
}
ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
我的 IAM 策略如下所示:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt123",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:region:account:stream/name"
]
},
{
"Sid": "Stmt456",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DeleteItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Scan",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:region:account:table/name"
]
},
{
"Sid": "Stmt789",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": [
"*"
]
}
]
}
我不明白这个应用程序有什么问题。任何有关此主题的指导将不胜感激。
最终我通过将凭据值设置到系统属性中使其工作。
System.setProperty("aws.accessKeyId","XXXXXX")
System.setProperty("aws.secretKey","XXXXXX")
但我对这个解决方案还不是很"happy"。
您认为这种方法有什么问题吗?
还有其他 DStreams 构造函数允许您传入 AWS 访问密钥和秘密密钥。
例如下面 link 中的第一个和第五个构造函数将允许您在构造函数中传递它们(并且它应该通过您的系统传递)而不是必须设置一个系统 属性 .