Pyflink 1.14 table 连接器 - Kafka 身份验证
Pyflink 1.14 table connectors - Kafka authentication
我只看到 Pyflink table API kafka 连接的例子,不 在连接建立中包含认证细节(doc ref), 即来源table 连接:
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
但是我需要在启用身份验证的情况下连接到 kafka 源。通过 'interpreting' 所有 property.XXX 都专用于 kafka 配置,我将示例更改如下并进行测试:
import os
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, environment_settings
from pyflink.table.table_environment import StreamTableEnvironment
KAFKA_SERVERS = 'localhost:9092'
KAFKA_USERNAME = "user"
KAFKA_PASSWORD = "XXX"
KAFKA_SOURCE_TOPIC = 'source'
KAFKA_SINK_TOPIC = 'dest'
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.11-1.14.0.jar")
env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.use_blink_planner()\
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)
source_ddl = f"""
CREATE TABLE source_table(
Cylinders INT,
Displacement INT,
Horsepower INT,
Weight INT,
Acceleration INT,
Model_Year INT,
USA INT,
Europe INT,
Japan INT
) WITH (
'connector' = 'kafka',
'topic' = '{KAFKA_SOURCE_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
'properties.group.id' = 'testgroup12',
'properties.sasl.mechanism' = 'PLAIN',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
sink_ddl = f"""
CREATE TABLE sink_table(
Cylinders INT,
Displacement INT,
Horsepower INT,
Weight INT,
Acceleration INT,
Model_Year INT,
USA INT,
Europe INT,
Japan INT
) WITH (
'connector' = 'kafka',
'topic' = '{KAFKA_SINK_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
'properties.group.id' = 'testgroup12',
'properties.sasl.mechanism' = 'PLAIN',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_query("SELECT * FROM source_table").execute_insert("sink_table").wait()
t_env.execute("kafka-table")
if __name__ == '__main__':
log_processing()
通过从 cli 添加此作业,没有响应或指示使用相应作业 ID 实例化作业:
Respectively no job created when viewing flink UI
如果我错误地配置了连接,有人可以纠正我,或者指出相关的文档来源(我已经用谷歌搜索了很多...)
按照@DavidAnderson 的建议找到问题所在。我的问题中的代码按原样工作……只需要分别更新依赖项 jar。如果使用 Scala 2.12 和 flink 版本 1.14,则适用以下依赖项(jar 依赖项已下载并可在相应目录中的 jobManager 上使用) :
env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.12-1.14.0.jar")
env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
我后来发现的一个有用的参考站点是 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.14.0
我只看到 Pyflink table API kafka 连接的例子,不 在连接建立中包含认证细节(doc ref), 即来源table 连接:
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
但是我需要在启用身份验证的情况下连接到 kafka 源。通过 'interpreting' 所有 property.XXX 都专用于 kafka 配置,我将示例更改如下并进行测试:
import os
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, environment_settings
from pyflink.table.table_environment import StreamTableEnvironment
KAFKA_SERVERS = 'localhost:9092'
KAFKA_USERNAME = "user"
KAFKA_PASSWORD = "XXX"
KAFKA_SOURCE_TOPIC = 'source'
KAFKA_SINK_TOPIC = 'dest'
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.11-1.14.0.jar")
env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.use_blink_planner()\
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)
source_ddl = f"""
CREATE TABLE source_table(
Cylinders INT,
Displacement INT,
Horsepower INT,
Weight INT,
Acceleration INT,
Model_Year INT,
USA INT,
Europe INT,
Japan INT
) WITH (
'connector' = 'kafka',
'topic' = '{KAFKA_SOURCE_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
'properties.group.id' = 'testgroup12',
'properties.sasl.mechanism' = 'PLAIN',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
sink_ddl = f"""
CREATE TABLE sink_table(
Cylinders INT,
Displacement INT,
Horsepower INT,
Weight INT,
Acceleration INT,
Model_Year INT,
USA INT,
Europe INT,
Japan INT
) WITH (
'connector' = 'kafka',
'topic' = '{KAFKA_SINK_TOPIC}',
'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
'properties.group.id' = 'testgroup12',
'properties.sasl.mechanism' = 'PLAIN',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_query("SELECT * FROM source_table").execute_insert("sink_table").wait()
t_env.execute("kafka-table")
if __name__ == '__main__':
log_processing()
通过从 cli 添加此作业,没有响应或指示使用相应作业 ID 实例化作业:
Respectively no job created when viewing flink UI
如果我错误地配置了连接,有人可以纠正我,或者指出相关的文档来源(我已经用谷歌搜索了很多...)
按照@DavidAnderson 的建议找到问题所在。我的问题中的代码按原样工作……只需要分别更新依赖项 jar。如果使用 Scala 2.12 和 flink 版本 1.14,则适用以下依赖项(jar 依赖项已下载并可在相应目录中的 jobManager 上使用) :
env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.12-1.14.0.jar")
env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
我后来发现的一个有用的参考站点是 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.14.0