无法将 Pyflink 源连接到 AWS Kinesis
Can't connect Pyflink source to AWS Kinesis
我正在使用 Pyflink 并尝试使用 AWS Kinesis 作为 Table API 的来源,使用以下说明:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table/python_table_api_connectors/
在这里使用连接器
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis/1.15.0
代码错误,即使我已经设置了我的区域:
Caused by: java.lang.IllegalArgumentException: For FlinkKinesisConsumer AWS region ('aws.region') and/or AWS endpoint ('aws.endpoint') must be set in the config.
我的代码如下:
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
DataTypes, FormatDescriptor)
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")
t_env.get_config().set("aws.region", "eu-west-2")
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kinesis',
'stream' = 'ais_raw',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.sql_query("SELECT a FROM source_table")
如有任何帮助,我们将不胜感激。
应在连接器选项中设置选项 aws.region
。也就是说,您需要定义源如下:
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kinesis',
'stream' = 'ais_raw',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'aws.region' = 'eu-west-2'
)
"""
有关详细信息,请参阅 kinesis connector documentation。
我正在使用 Pyflink 并尝试使用 AWS Kinesis 作为 Table API 的来源,使用以下说明: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table/python_table_api_connectors/
在这里使用连接器 https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kinesis/1.15.0 代码错误,即使我已经设置了我的区域:
Caused by: java.lang.IllegalArgumentException: For FlinkKinesisConsumer AWS region ('aws.region') and/or AWS endpoint ('aws.endpoint') must be set in the config.
我的代码如下:
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
DataTypes, FormatDescriptor)
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")
t_env.get_config().set("aws.region", "eu-west-2")
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kinesis',
'stream' = 'ais_raw',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.sql_query("SELECT a FROM source_table")
如有任何帮助,我们将不胜感激。
应在连接器选项中设置选项 aws.region
。也就是说,您需要定义源如下:
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kinesis',
'stream' = 'ais_raw',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'aws.region' = 'eu-west-2'
)
"""
有关详细信息,请参阅 kinesis connector documentation。