使用 localstack + 测试容器测试运动时出现 SSL 错误
SSL error when testing kinesis with localstack + test containers
我 运行 在尝试连接到 localstack 容器中的 kinesis 运行 时遇到了问题。我使用 testcontainers 做了一个小示例测试,但我在我的应用程序中遇到了同样的错误。
这是错误:
[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407741] [0x00002233][0x0000700003935000] [warning] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 60 - SSL peer certificate or SSH remote key was not OK
[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407806] [0x00002233][0x0000700003935000] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: -1
Resolved remote host IP address: 127.0.0.1
Request ID:
Exception name:
Error message: curlCode: 60, SSL peer certificate or SSH remote key was not OK
0 response headers:
[kpl-daemon-0003] INFO com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407825] [0x00002233][0x0000700003935000] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
错误会重复多次,直到测试完成。如何修复此错误并使连接正常工作?我试过更新我的系统。我打开容器的 cli 并检查时间是否与我的系统时间匹配(容器甚至可以与我的系统有不同的时间吗?)。
这是示例测试。
@Testcontainers
public class KinesisTest {
static class TestProcessorFactory implements ShardRecordProcessorFactory {
private final TestKinesisRecordService service;
public TestProcessorFactory(TestKinesisRecordService service) {
this.service = service;
}
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new TestRecordProcessor(service);
}
}
static class TestRecordProcessor implements ShardRecordProcessor {
public final TestKinesisRecordService service;
public TestRecordProcessor(TestKinesisRecordService service) {
this.service = service;
}
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
service.addRecord(processRecordsInput);
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
}
}
static class TestKinesisRecordService {
private List<ProcessRecordsInput> records = Collections.synchronizedList(new ArrayList<>());
public void addRecord(ProcessRecordsInput processRecordsInput) {
records.add(processRecordsInput);
}
public List<ProcessRecordsInput> getRecords() {
return Collections.unmodifiableList(records);
}
}
public static final String streamName = "stream-name";
public static final String partitionKey = "partition-key";
DockerImageName localstackImage = DockerImageName.parse("localstack/localstack:0.14.1");
@Container
public LocalStackContainer localstack = new LocalStackContainer(localstackImage)
.withServices(KINESIS)
.withEnv("KINESIS_INITIALIZE_STREAMS", streamName + ":1");
public Scheduler scheduler;
public TestKinesisRecordService service = new TestKinesisRecordService();
public KinesisProducer producer;
@BeforeEach
void setup() {
KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder().endpointOverride(localstack.getEndpointOverride(KINESIS)).region(Region.of(localstack.getRegion()))
);
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(DYNAMODB)).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(CLOUDWATCH)).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, "KinesisPratTest", kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestProcessorFactory(service));
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
producer = producer();
}
@AfterEach
public void teardown() throws ExecutionException, InterruptedException, TimeoutException {
producer.destroy();
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
}
public KinesisProducer producer() {
var configuration = new KinesisProducerConfiguration()
.setRecordMaxBufferedTime(100)
.setRecordTtl(30000)
.setRequestTimeout(5000)
.setCredentialsProvider(localstack.getDefaultCredentialsProvider())
.setMetricsCredentialsProvider(localstack.getDefaultCredentialsProvider())
.setRegion(localstack.getRegion())
.setCloudwatchEndpoint(localstack.getEndpointOverride(CLOUDWATCH).getHost())
.setCloudwatchPort(localstack.getEndpointOverride(CLOUDWATCH).getPort())
.setKinesisEndpoint(localstack.getEndpointOverride(KINESIS).getHost())
.setKinesisPort(localstack.getEndpointOverride(KINESIS).getPort());
return new KinesisProducer(configuration);
}
@Test
void test() {
producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap("Hello".getBytes(StandardCharsets.UTF_8)));
await().until(() -> service.getRecords(), records -> records.size() > 0);
}
}
Error message: curlCode: 60, SSL peer certificate or SSH remote key was not OK
听起来主机名与证书不匹配,或者您不信任签署它的 CA。
您可以设置选项以使用 KinesisProducerConfig 禁用主机名验证 .setVerifyCertificate(false)
。
否则,请参阅解决 cert/trust 问题。
我 运行 在尝试连接到 localstack 容器中的 kinesis 运行 时遇到了问题。我使用 testcontainers 做了一个小示例测试,但我在我的应用程序中遇到了同样的错误。
这是错误:
[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407741] [0x00002233][0x0000700003935000] [warning] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 60 - SSL peer certificate or SSH remote key was not OK
[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407806] [0x00002233][0x0000700003935000] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: -1
Resolved remote host IP address: 127.0.0.1
Request ID:
Exception name:
Error message: curlCode: 60, SSL peer certificate or SSH remote key was not OK
0 response headers:
[kpl-daemon-0003] INFO com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407825] [0x00002233][0x0000700003935000] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.
错误会重复多次,直到测试完成。如何修复此错误并使连接正常工作?我试过更新我的系统。我打开容器的 cli 并检查时间是否与我的系统时间匹配(容器甚至可以与我的系统有不同的时间吗?)。
这是示例测试。
@Testcontainers
public class KinesisTest {
static class TestProcessorFactory implements ShardRecordProcessorFactory {
private final TestKinesisRecordService service;
public TestProcessorFactory(TestKinesisRecordService service) {
this.service = service;
}
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new TestRecordProcessor(service);
}
}
static class TestRecordProcessor implements ShardRecordProcessor {
public final TestKinesisRecordService service;
public TestRecordProcessor(TestKinesisRecordService service) {
this.service = service;
}
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
service.addRecord(processRecordsInput);
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
}
}
static class TestKinesisRecordService {
private List<ProcessRecordsInput> records = Collections.synchronizedList(new ArrayList<>());
public void addRecord(ProcessRecordsInput processRecordsInput) {
records.add(processRecordsInput);
}
public List<ProcessRecordsInput> getRecords() {
return Collections.unmodifiableList(records);
}
}
public static final String streamName = "stream-name";
public static final String partitionKey = "partition-key";
DockerImageName localstackImage = DockerImageName.parse("localstack/localstack:0.14.1");
@Container
public LocalStackContainer localstack = new LocalStackContainer(localstackImage)
.withServices(KINESIS)
.withEnv("KINESIS_INITIALIZE_STREAMS", streamName + ":1");
public Scheduler scheduler;
public TestKinesisRecordService service = new TestKinesisRecordService();
public KinesisProducer producer;
@BeforeEach
void setup() {
KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
KinesisAsyncClient.builder().endpointOverride(localstack.getEndpointOverride(KINESIS)).region(Region.of(localstack.getRegion()))
);
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(DYNAMODB)).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(CLOUDWATCH)).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, "KinesisPratTest", kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestProcessorFactory(service));
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
producer = producer();
}
@AfterEach
public void teardown() throws ExecutionException, InterruptedException, TimeoutException {
producer.destroy();
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
}
public KinesisProducer producer() {
var configuration = new KinesisProducerConfiguration()
.setRecordMaxBufferedTime(100)
.setRecordTtl(30000)
.setRequestTimeout(5000)
.setCredentialsProvider(localstack.getDefaultCredentialsProvider())
.setMetricsCredentialsProvider(localstack.getDefaultCredentialsProvider())
.setRegion(localstack.getRegion())
.setCloudwatchEndpoint(localstack.getEndpointOverride(CLOUDWATCH).getHost())
.setCloudwatchPort(localstack.getEndpointOverride(CLOUDWATCH).getPort())
.setKinesisEndpoint(localstack.getEndpointOverride(KINESIS).getHost())
.setKinesisPort(localstack.getEndpointOverride(KINESIS).getPort());
return new KinesisProducer(configuration);
}
@Test
void test() {
producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap("Hello".getBytes(StandardCharsets.UTF_8)));
await().until(() -> service.getRecords(), records -> records.size() > 0);
}
}
Error message: curlCode: 60, SSL peer certificate or SSH remote key was not OK
听起来主机名与证书不匹配,或者您不信任签署它的 CA。
您可以设置选项以使用 KinesisProducerConfig 禁用主机名验证 .setVerifyCertificate(false)
。
否则,请参阅解决 cert/trust 问题。