Spark Streaming Kinesis Integration:在 Worker 中初始化 LeaseCoordinator 时出错

Spark Streaming Kinesis Integration: Error while initializing LeaseCoordinator in Worker

当 运行 在 scala 中使用 kinesis 应用程序进行简单的 vanilla spark 流式传输时,我遇到了一些问题。我遵循了一些教程中的基本指导,如 Snowplow and WordCountASL

但由于这个 Kinesis Worker 错误,我仍然无法让它工作:

16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
    ... 4 more

这是我的代码示例:

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

/**
  * Created by franco on 11/11/16.
  */
object TestApp {
  // === Configurations for Kinesis streams ===
  val awsAccessKeyId = "XXXXXX"
  val awsSecretKey = "XXXXXXX"
  val kinesisStreamName = "MyStream"
  val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
  val appName = "MyAppName"

  def main(args: Array[String]): Unit = {

    val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)

    val provider = new StaticCredentialsProvider(credentials)

    val kinesisClient = new AmazonKinesisClient(provider)
    kinesisClient.setEndpoint(kinesisEndpointUrl)

    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()

    val streams = shards

    val batchInterval = Milliseconds(2000)

    val kinesisCheckpointInterval = batchInterval

    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName

    val cores : Int = Runtime.getRuntime.availableProcessors()
    println("Available Cores : " + cores.toString)
    val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
    val ssc = new StreamingContext(config, batchInterval)

    // Create the Kinesis DStreams
    val kinesisStreams = (0 until streams).map { i =>
      KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
    }

    ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }


}

我的 IAM 策略如下所示:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:region:account:stream/name"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DeleteItem",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:Scan",
                "dynamodb:UpdateItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:region:account:table/name"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

我不明白这个应用程序有什么问题。任何有关此主题的指导将不胜感激。

最终我通过将凭据值设置到系统属性中使其工作。

System.setProperty("aws.accessKeyId","XXXXXX")
System.setProperty("aws.secretKey","XXXXXX")

但我对这个解决方案还不是很"happy"。

您认为这种方法有什么问题吗?

还有其他 DStreams 构造函数允许您传入 AWS 访问密钥和秘密密钥。

例如下面 link 中的第一个和第五个构造函数将允许您在构造函数中传递它们(并且它应该通过您的系统传递)而不是必须设置一个系统 属性 .

KinesisUtil Constructors