Apache Flink - 无法为 FlinkKinesisConsumer 使用本地 Kinesis
Apache Flink - Unable to use local Kinesis for FlinkKinesisConsumer
到目前为止,我已经按照为 Flink 运动连接器记录的说明使用本地 Kinesis。
Using Non-AWS Kinesis Endpoints for Testing
Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
对于 Flink 生产者,这些指令适用于本地运动(我使用 Kinesalite)。
但是,对于 Flink 消费者,我得到一个例外,即 aws.region
和 aws.endpoint
不允许 both。但是区域是必需的,这意味着它不可能覆盖端点。
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.
这是连接器中的错误吗?我看到一个相关的 PR:https://github.com/apache/flink/pull/6045 .
我找到了 workaround on Flink's mailing list,但他们将此描述为生产者而非消费者的问题,而我的看法恰恰相反(我认为),因此对此不确定。真是一头雾水。
此问题与验证检查中的 XOR
条件有关。
如您所见,validateConsumerConfiguration
方法在 if 语句中执行 XOR 验证。因此,您只能指定选中的两个参数之一。
要设置自定义 URL,您需要删除 AWSConfigConstants.AWS_REGION
属性并仅使用 link。
// Set the given URL
consumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, URL);
// Remove the region
consumerConfig.remove(AWSConfigConstants.AWS_REGION);
此解决方案,修复与以下 StackTrace 相关的错误:
java.lang.IllegalArgumentException: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.
at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.validateConsumerConfiguration(KinesisConfigUtil.java:92)
自提出这个问题以来,已经取得了一些进展。
提问者在 this jira which was marked as a duplicate of this second jira 中推送了问题。
该问题现在应该已解决,并且可用于 1.10 及更高版本的修复程序。
到目前为止,我已经按照为 Flink 运动连接器记录的说明使用本地 Kinesis。
Using Non-AWS Kinesis Endpoints for Testing
Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
对于 Flink 生产者,这些指令适用于本地运动(我使用 Kinesalite)。
但是,对于 Flink 消费者,我得到一个例外,即 aws.region
和 aws.endpoint
不允许 both。但是区域是必需的,这意味着它不可能覆盖端点。
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.
这是连接器中的错误吗?我看到一个相关的 PR:https://github.com/apache/flink/pull/6045 .
我找到了 workaround on Flink's mailing list,但他们将此描述为生产者而非消费者的问题,而我的看法恰恰相反(我认为),因此对此不确定。真是一头雾水。
此问题与验证检查中的 XOR
条件有关。
如您所见,validateConsumerConfiguration
方法在 if 语句中执行 XOR 验证。因此,您只能指定选中的两个参数之一。
要设置自定义 URL,您需要删除 AWSConfigConstants.AWS_REGION
属性并仅使用 link。
// Set the given URL
consumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, URL);
// Remove the region
consumerConfig.remove(AWSConfigConstants.AWS_REGION);
此解决方案,修复与以下 StackTrace 相关的错误:
java.lang.IllegalArgumentException: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.
at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.validateConsumerConfiguration(KinesisConfigUtil.java:92)
自提出这个问题以来,已经取得了一些进展。
提问者在 this jira which was marked as a duplicate of this second jira 中推送了问题。
该问题现在应该已解决,并且可用于 1.10 及更高版本的修复程序。