在 Lambda 中使用 KCL 1.*:凭证
Use KCL 1.* in Lambda: Credentials
使用 https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis 中提供的示例,我了解如何使用 KCL 从我的本地计算机读取 KinesisEvents。
我正在尝试在 Lambda 函数中实现相同的逻辑。
要设置 KinesisClientLibConfiguration,您需要提供一个 AWSCredentialsProvider。
我从哪里获得这些 AWSCredentials 以在 Lambda 中创建 kinesisClientLibConfiguration?
KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(SAMPLE_APPLICATION_NAME, SAMPLE_APPLICATION_STREAM_NAME, credentialsProvider, workerId);
处理程序的完整代码如下所示:
public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, Void> {
public static final String SAMPLE_APPLICATION_STREAM_NAME = "kinesis-s";
public static final String SAMPLE_APPLICATION_STREAM_REGION = "eu-west-1";
private static final String SAMPLE_APPLICATION_NAME = "SampleKinesisLambdaApplication";
private static final InitialPositionInStream SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM =
InitialPositionInStream.LATEST;
private static ProfileCredentialsProvider credentialsProvider;
public Void handleRequest(KinesisEvent event, Context context) {
init();
int exitCode = 0;
try {
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
KinesisClientLibConfiguration kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(SAMPLE_APPLICATION_NAME,
SAMPLE_APPLICATION_STREAM_NAME,
credentialsProvider,
workerId);
kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
kinesisClientLibConfiguration.withRegionName(SAMPLE_APPLICATION_STREAM_REGION);
IRecordProcessorFactory recordProcessorFactory = new AmazonKinesisApplicationRecordProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
System.out.printf("Running %s to process stream %s as worker %s...\n",
SAMPLE_APPLICATION_NAME,
SAMPLE_APPLICATION_STREAM_NAME,
workerId);
worker.run();
} catch (Throwable e) {
System.err.println("Caught throwable while processing data.");
e.printStackTrace();
}
System.exit(exitCode);
return null;
}
private static void init() {
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
credentialsProvider = new ProfileCredentialsProvider();
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials", e);
}
}
}
Lambda 确实为凭证提供了环境变量:
通过 EnvironmentVariableCredentialsProvider 访问它们:
使用 https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis 中提供的示例,我了解如何使用 KCL 从我的本地计算机读取 KinesisEvents。
我正在尝试在 Lambda 函数中实现相同的逻辑。
要设置 KinesisClientLibConfiguration,您需要提供一个 AWSCredentialsProvider。
我从哪里获得这些 AWSCredentials 以在 Lambda 中创建 kinesisClientLibConfiguration?
KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(SAMPLE_APPLICATION_NAME, SAMPLE_APPLICATION_STREAM_NAME, credentialsProvider, workerId);
处理程序的完整代码如下所示:
public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, Void> {
public static final String SAMPLE_APPLICATION_STREAM_NAME = "kinesis-s";
public static final String SAMPLE_APPLICATION_STREAM_REGION = "eu-west-1";
private static final String SAMPLE_APPLICATION_NAME = "SampleKinesisLambdaApplication";
private static final InitialPositionInStream SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM =
InitialPositionInStream.LATEST;
private static ProfileCredentialsProvider credentialsProvider;
public Void handleRequest(KinesisEvent event, Context context) {
init();
int exitCode = 0;
try {
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
KinesisClientLibConfiguration kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(SAMPLE_APPLICATION_NAME,
SAMPLE_APPLICATION_STREAM_NAME,
credentialsProvider,
workerId);
kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
kinesisClientLibConfiguration.withRegionName(SAMPLE_APPLICATION_STREAM_REGION);
IRecordProcessorFactory recordProcessorFactory = new AmazonKinesisApplicationRecordProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
System.out.printf("Running %s to process stream %s as worker %s...\n",
SAMPLE_APPLICATION_NAME,
SAMPLE_APPLICATION_STREAM_NAME,
workerId);
worker.run();
} catch (Throwable e) {
System.err.println("Caught throwable while processing data.");
e.printStackTrace();
}
System.exit(exitCode);
return null;
}
private static void init() {
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
credentialsProvider = new ProfileCredentialsProvider();
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials", e);
}
}
}
Lambda 确实为凭证提供了环境变量:
通过 EnvironmentVariableCredentialsProvider 访问它们: