使用 localstack + 测试容器测试运动时出现 SSL 错误

SSL error when testing kinesis with localstack + test containers

我 运行 在尝试连接到 localstack 容器中的 kinesis 运行 时遇到了问题。我使用 testcontainers 做了一个小示例测试,但我在我的应用程序中遇到了同样的错误。

这是错误:

[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407741] [0x00002233][0x0000700003935000] [warning] [AWS Log: ERROR](CurlHttpClient)Curl returned error code 60 - SSL peer certificate or SSH remote key was not OK
[kpl-daemon-0003] WARN com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407806] [0x00002233][0x0000700003935000] [warning] [AWS Log: ERROR](AWSClient)HTTP response code: -1
Resolved remote host IP address: 127.0.0.1
Request ID: 
Exception name: 
Error message: curlCode: 60, SSL peer certificate or SSH remote key was not OK
0 response headers:
[kpl-daemon-0003] INFO com.amazonaws.services.kinesis.producer.LogInputStreamReader - [2022-03-29 11:59:40.407825] [0x00002233][0x0000700003935000] [info] [AWS Log: WARN](AWSClient)If the signature check failed. This could be because of a time skew. Attempting to adjust the signer.

错误会重复多次,直到测试完成。如何修复此错误并使连接正常工作?我试过更新我的系统。我打开容器的 cli 并检查时间是否与我的系统时间匹配(容器甚至可以与我的系统有不同的时间吗?)。

这是示例测试。

@Testcontainers
public class KinesisTest {
    static class TestProcessorFactory implements ShardRecordProcessorFactory {

        private final TestKinesisRecordService service;

        public TestProcessorFactory(TestKinesisRecordService service) {
            this.service = service;
        }

        @Override
        public ShardRecordProcessor shardRecordProcessor() {
            return new TestRecordProcessor(service);
        }
    }

    static class TestRecordProcessor implements ShardRecordProcessor {

        public final TestKinesisRecordService service;

        public TestRecordProcessor(TestKinesisRecordService service) {
            this.service = service;
        }

        @Override
        public void initialize(InitializationInput initializationInput) {

        }

        @Override
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            service.addRecord(processRecordsInput);
        }

        @Override
        public void leaseLost(LeaseLostInput leaseLostInput) {

        }

        @Override
        public void shardEnded(ShardEndedInput shardEndedInput) {

        }

        @Override
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {

        }
    }

    static class TestKinesisRecordService {
        private List<ProcessRecordsInput> records = Collections.synchronizedList(new ArrayList<>());

        public void addRecord(ProcessRecordsInput processRecordsInput) {
            records.add(processRecordsInput);
        }

        public List<ProcessRecordsInput> getRecords() {
            return Collections.unmodifiableList(records);
        }
    }

    public static final String streamName = "stream-name";
    public static final String partitionKey = "partition-key";

    DockerImageName localstackImage = DockerImageName.parse("localstack/localstack:0.14.1");

    @Container
    public LocalStackContainer localstack = new LocalStackContainer(localstackImage)
            .withServices(KINESIS)
            .withEnv("KINESIS_INITIALIZE_STREAMS", streamName + ":1");

    public Scheduler scheduler;
    public TestKinesisRecordService service = new TestKinesisRecordService();
    public KinesisProducer producer;

    @BeforeEach
    void setup() {
        KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
                KinesisAsyncClient.builder().endpointOverride(localstack.getEndpointOverride(KINESIS)).region(Region.of(localstack.getRegion()))
        );
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(DYNAMODB)).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(Region.of(localstack.getRegion())).endpointOverride(localstack.getEndpointOverride(CLOUDWATCH)).build();

        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, "KinesisPratTest", kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new TestProcessorFactory(service));

        scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

        producer = producer();
    }

    @AfterEach
    public void teardown() throws ExecutionException, InterruptedException, TimeoutException {
        producer.destroy();
        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
    }

    public KinesisProducer producer() {
        var configuration = new KinesisProducerConfiguration()
                .setRecordMaxBufferedTime(100)
                .setRecordTtl(30000)
                .setRequestTimeout(5000)
                .setCredentialsProvider(localstack.getDefaultCredentialsProvider())
                .setMetricsCredentialsProvider(localstack.getDefaultCredentialsProvider())
                .setRegion(localstack.getRegion())
                .setCloudwatchEndpoint(localstack.getEndpointOverride(CLOUDWATCH).getHost())
                .setCloudwatchPort(localstack.getEndpointOverride(CLOUDWATCH).getPort())
                .setKinesisEndpoint(localstack.getEndpointOverride(KINESIS).getHost())
                .setKinesisPort(localstack.getEndpointOverride(KINESIS).getPort());

        return new KinesisProducer(configuration);
    }

    @Test
    void test() {
        producer.addUserRecord(streamName, partitionKey, ByteBuffer.wrap("Hello".getBytes(StandardCharsets.UTF_8)));
        await().until(() -> service.getRecords(), records -> records.size() > 0);
    }
}

Error message: curlCode: 60, SSL peer certificate or SSH remote key was not OK

听起来主机名与证书不匹配,或者您不信任签署它的 CA。

您可以设置选项以使用 KinesisProducerConfig 禁用主机名验证 .setVerifyCertificate(false)

否则,请参阅解决 cert/trust 问题。