在 Google Dataflow 上使用 KafkaIO 通过 SSL 连接到 Kafka
Connect to Kafka with SSL using KafkaIO on Google Dataflow
从服务器上,我能够连接并从配置了 SSL 的远程 kafka 服务器主题中获取数据。
我如何从 GCP 连接到远程 kafka 服务器,使用 Google 数据流管道传递 SSL 信任库、密钥库证书位置和 Google 服务帐户 json?
我正在使用 Eclipse 插件作为数据流运行器选项。
如果我指向 GCS 上的证书,当证书指向 Google 存储桶时会抛出错误。
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: org.apache.kafka.common.KafkaException:
java.io.FileNotFoundException:
gs:/bucket/folder/truststore-client.jks (No such file or directory)
已关注:
更新代码指向 SSL 信任库,密钥库位置到本地机器的 /tmp 目录证书,以防 KafkaIO 需要从文件路径读取。它没有抛出 FileNotFoundError。
尝试了 运行 来自 GCP 帐户的服务器 Java 客户端代码,并且还使用了 Dataflow - Beam Java 管道,我收到以下错误。
ssl.truststore.location = <LOCAL MACHINE CERTICATE FILE PATH>
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 1.0.0
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : aaa7af6d4a11b29d
org.apache.kafka.common.network.SslTransportLayer close
WARNING: Failed to send SSL Close message
java.io.IOException: Broken pipe
org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:153)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:205)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at
org.apache.kafka.common.utils.LogContext$KafkaLogger warn
WARNING: [Consumer clientId=consumer-1, groupId=test-group] Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
任何建议或示例表示赞赏。
Git 从本地机器克隆或上传 Java Maven 项目到 GCP Cloud Shell 主目录。
在 Cloud Shell 终端上使用 Dataflow runner 命令编译项目。
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=com.packagename.JavaClass \
-Dexec.args="--project=PROJECT_ID \
--stagingLocation=gs://BUCKET/PATH/ \
--tempLocation=gs://BUCKET/temp/ \
--output=gs://BUCKET/PATH/output \
--runner=DataflowRunner"
确保运行器设置为 DataflowRunnner.class,当 运行 在云端时,您会在数据流控制台上看到该作业。 DirectRunner 执行不会显示在云数据流控制台上。
将证书放在 Maven 项目的资源文件夹中,并使用 ClassLoader 读取文件。
ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource("keystore.jks").getFile());
resourcePath.put("keystore.jks",file.getAbsoluteFile().getPath());
编写一个 ConsumerFactoryFn() 以复制 Dataflow 的“/tmp/”目录中的证书,如
中所述
使用具有资源路径属性的 KafkaIO。
Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/tmp/truststore.jks");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/tmp/keystore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
//other properties
...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers(BOOTSTRAP_SERVERS)
.withTopic(TOPIC)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)
.withConsumerFactoryFn(new ConsumerFactoryFn())
.withMaxNumRecords(50)
.withoutMetadata()
).apply(Values.<String>create());
// Apply Beam transformations and write to output.
从服务器上,我能够连接并从配置了 SSL 的远程 kafka 服务器主题中获取数据。
我如何从 GCP 连接到远程 kafka 服务器,使用 Google 数据流管道传递 SSL 信任库、密钥库证书位置和 Google 服务帐户 json?
我正在使用 Eclipse 插件作为数据流运行器选项。
如果我指向 GCS 上的证书,当证书指向 Google 存储桶时会抛出错误。
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: org.apache.kafka.common.KafkaException:
java.io.FileNotFoundException:
gs:/bucket/folder/truststore-client.jks (No such file or directory)
已关注:
更新代码指向 SSL 信任库,密钥库位置到本地机器的 /tmp 目录证书,以防 KafkaIO 需要从文件路径读取。它没有抛出 FileNotFoundError。
尝试了 运行 来自 GCP 帐户的服务器 Java 客户端代码,并且还使用了 Dataflow - Beam Java 管道,我收到以下错误。
ssl.truststore.location = <LOCAL MACHINE CERTICATE FILE PATH>
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 1.0.0
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : aaa7af6d4a11b29d
org.apache.kafka.common.network.SslTransportLayer close
WARNING: Failed to send SSL Close message
java.io.IOException: Broken pipe
org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:153)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:205)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at
org.apache.kafka.common.utils.LogContext$KafkaLogger warn
WARNING: [Consumer clientId=consumer-1, groupId=test-group] Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials.
任何建议或示例表示赞赏。
Git 从本地机器克隆或上传 Java Maven 项目到 GCP Cloud Shell 主目录。 在 Cloud Shell 终端上使用 Dataflow runner 命令编译项目。
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=com.packagename.JavaClass \
-Dexec.args="--project=PROJECT_ID \
--stagingLocation=gs://BUCKET/PATH/ \
--tempLocation=gs://BUCKET/temp/ \
--output=gs://BUCKET/PATH/output \
--runner=DataflowRunner"
确保运行器设置为 DataflowRunnner.class,当 运行 在云端时,您会在数据流控制台上看到该作业。 DirectRunner 执行不会显示在云数据流控制台上。
将证书放在 Maven 项目的资源文件夹中,并使用 ClassLoader 读取文件。
ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource("keystore.jks").getFile());
resourcePath.put("keystore.jks",file.getAbsoluteFile().getPath());
编写一个 ConsumerFactoryFn() 以复制 Dataflow 的“/tmp/”目录中的证书,如
使用具有资源路径属性的 KafkaIO。
Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/tmp/truststore.jks");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/tmp/keystore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD);
//other properties
...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers(BOOTSTRAP_SERVERS)
.withTopic(TOPIC)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)
.withConsumerFactoryFn(new ConsumerFactoryFn())
.withMaxNumRecords(50)
.withoutMetadata()
).apply(Values.<String>create());
// Apply Beam transformations and write to output.