GCP Dataproc - 无法构建 kafka 消费者,无法加载 JKS 类型的 SSL 密钥库 dataproc.jks
GCP Dataproc - Failed to construct kafka consumer, Failed to load SSL keystore dataproc.jks of type JKS
我正在尝试 运行 GCP Dataproc 上的结构化流程序,它从 Kafka 访问数据并打印它。
使用 SSL 访问 Kafka,truststore 和 keystore 文件存储在桶中。
我正在使用 Google Storage API 访问存储桶,并将文件存储在当前工作目录中。信任库和密钥库被传递到 Kafka Consumer/Producer。
但是 - 我收到一个错误
命令:
gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 --region us-central1
代码如下:
spark = SparkSession.builder.appName('StructuredStreaming_VersaSase').getOrCreate()
kafkaBrokers='<broker-ip>:9094'
topic = "versa-sase"
security_protocol="SSL"
# Google Storage API to access the keys in the buckets
client = storage.Client()
bucket = client.get_bucket('ssl-certs-karan')
blob_ssl_truststore = bucket.get_blob('cap12.jks')
ssl_truststore_location = '{}/{}'.format(os.getcwd(), blob_ssl_truststore.name)
blob_ssl_truststore.download_to_filename(ssl_truststore_location)
ssl_truststore_password="<ssl_truststore_password>"
blob_ssl_keystore = bucket.get_blob('dataproc-versa-sase-p12-1.jks')
ssl_keystore_location = '{}/{}'.format(os.getcwd(), blob_ssl_keystore.name)
blob_ssl_keystore.download_to_filename(ssl_keystore_location)
ssl_keystore_password="<ssl_keystore_password>"
consumerGroupId = "versa-sase-grp"
checkpoint = "gs://ss-checkpoint/"
print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)
df = spark.read.format('kafka')\
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location",ssl_truststore_location) \
.option("kafka.ssl.truststore.password",ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location)\
.option("kafka.ssl.keystore.password", ssl_keystore_password)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.load()
print(" df -> ", df)
query = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "timestamp") \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation", checkpoint) \
.option("outputMode", "complete")\
.option("truncate", "false") \
.save("output")
错误:
Traceback (most recent call last):
File "/tmp/3e7304f8e27d4436a2f382280cebe7c5/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py", line 83, in <module>
query = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "timestamp") \
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.Client: IPC Client (1416219052) connection to dataproc-ss-poc-m/10.128.0.78:8030 from root sending #171 org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB.allocate
22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.Client: IPC Client (1416219052) connection to dataproc-ss-poc-m/10.128.0.78:8030 from root got value #171
22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine: Call: allocate took 2ms
: An error occurred while calling o84.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (dataproc-ss-poc-w-0.c.versa-kafka-poc.internal executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:124)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:61)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactory.java:60)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:342)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:265)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.borrowObject(InternalKafkaConsumerPool.scala:84)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.retrieveConsumer(KafkaDataConsumer.scala:573)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:558)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$getAvailableOffsetRange(KafkaDataConsumer.scala:359)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:618)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getAvailableOffsetRange(KafkaDataConsumer.scala:358)
at org.apache.spark.sql.kafka010.KafkaSourceRDD.resolveRange(KafkaSourceRDD.scala:123)
at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:75)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
...
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/3e7304f8e27d4436a2f382280cebe7c5/dataproc-versa-sase-p12-1.jks of type JKS
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:377)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:349)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:299)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:138)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:737)
... 53 more
Caused by: java.nio.file.NoSuchFileException: /tmp/3e7304f8e27d4436a2f382280cebe7c5/dataproc-versa-sase-p12-1.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:370)
在我的 mac 中,我正在使用 PKCS 文件 (.p12),并且能够以 SSL 模式访问 Kafka 集群。但是,在 Dataproc 中 - 预期的文件格式似乎是 JKS。
这是我用来将 .p12 文件转换为 JKS 格式的命令:
keytool -importkeystore -srckeystore dataproc-versa-sase.p12 -srcstoretype pkcs12 -srcalias versa-sase-user -destkeystore dataproc-versa-sase-p12-1.jks -deststoretype jks -deststorepass <password> -destalias versa-sase-user
需要做什么来解决这个问题?
Spark 程序似乎无法访问 JKS 文件?
蒂亚!
如果你想使用 jks,我会添加以下选项
.option("kafka.ssl.keystore.type", "JKS")
.option("kafka.ssl.truststore.type", "JKS")
顺便说一句,这也适用于 PKCS12
.option("kafka.ssl.keystore.type", "PKCS12")
.option("kafka.ssl.truststore.type", "PKCS12")
就像之前有人提到的那样,您可以检查它是否是 jdk 兼容性问题,这样做如下:
keytool -v -list -storetype pkcs12 -keystore kafka-client-jdk8-truststore.p12
如果您收到一条显示密钥库的消息,那么您是安全的,但如果您收到一条消息,指出找不到标识符,则意味着 jdk 秒内存在差异。
根据@OneCricketer 的说明,我能够通过使用 --files <gs://cert1>,<gs://cert2>
来实现它。
此外,这在使用集群模式时有效。
ClusterMode 命令
gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.submit.deployMode=cluster --region us-central1
客户端模式:
gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --region us-central1
正在访问驱动程序中的证书:
# access using the cert name
ssl_truststore_location="ca.p12"
ssl_keystore_location="dataproc-versa-sase.p12"
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10) \
.load()
我正在尝试 运行 GCP Dataproc 上的结构化流程序,它从 Kafka 访问数据并打印它。
使用 SSL 访问 Kafka,truststore 和 keystore 文件存储在桶中。 我正在使用 Google Storage API 访问存储桶,并将文件存储在当前工作目录中。信任库和密钥库被传递到 Kafka Consumer/Producer。 但是 - 我收到一个错误
命令:
gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 --region us-central1
代码如下:
spark = SparkSession.builder.appName('StructuredStreaming_VersaSase').getOrCreate()
kafkaBrokers='<broker-ip>:9094'
topic = "versa-sase"
security_protocol="SSL"
# Google Storage API to access the keys in the buckets
client = storage.Client()
bucket = client.get_bucket('ssl-certs-karan')
blob_ssl_truststore = bucket.get_blob('cap12.jks')
ssl_truststore_location = '{}/{}'.format(os.getcwd(), blob_ssl_truststore.name)
blob_ssl_truststore.download_to_filename(ssl_truststore_location)
ssl_truststore_password="<ssl_truststore_password>"
blob_ssl_keystore = bucket.get_blob('dataproc-versa-sase-p12-1.jks')
ssl_keystore_location = '{}/{}'.format(os.getcwd(), blob_ssl_keystore.name)
blob_ssl_keystore.download_to_filename(ssl_keystore_location)
ssl_keystore_password="<ssl_keystore_password>"
consumerGroupId = "versa-sase-grp"
checkpoint = "gs://ss-checkpoint/"
print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)
df = spark.read.format('kafka')\
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location",ssl_truststore_location) \
.option("kafka.ssl.truststore.password",ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location)\
.option("kafka.ssl.keystore.password", ssl_keystore_password)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.load()
print(" df -> ", df)
query = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "timestamp") \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation", checkpoint) \
.option("outputMode", "complete")\
.option("truncate", "false") \
.save("output")
错误:
Traceback (most recent call last):
File "/tmp/3e7304f8e27d4436a2f382280cebe7c5/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py", line 83, in <module>
query = df.selectExpr("CAST(value AS STRING)", "CAST(key AS STRING)", "topic", "timestamp") \
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.Client: IPC Client (1416219052) connection to dataproc-ss-poc-m/10.128.0.78:8030 from root sending #171 org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB.allocate
22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.Client: IPC Client (1416219052) connection to dataproc-ss-poc-m/10.128.0.78:8030 from root got value #171
22/02/02 23:11:08 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine: Call: allocate took 2ms
: An error occurred while calling o84.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (dataproc-ss-poc-w-0.c.versa-kafka-poc.internal executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:124)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:61)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactory.java:60)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:342)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:265)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.borrowObject(InternalKafkaConsumerPool.scala:84)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.retrieveConsumer(KafkaDataConsumer.scala:573)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:558)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$getAvailableOffsetRange(KafkaDataConsumer.scala:359)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:618)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getAvailableOffsetRange(KafkaDataConsumer.scala:358)
at org.apache.spark.sql.kafka010.KafkaSourceRDD.resolveRange(KafkaSourceRDD.scala:123)
at org.apache.spark.sql.kafka010.KafkaSourceRDD.compute(KafkaSourceRDD.scala:75)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
...
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/3e7304f8e27d4436a2f382280cebe7c5/dataproc-versa-sase-p12-1.jks of type JKS
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:377)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:349)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:299)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:138)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:74)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:737)
... 53 more
Caused by: java.nio.file.NoSuchFileException: /tmp/3e7304f8e27d4436a2f382280cebe7c5/dataproc-versa-sase-p12-1.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:370)
在我的 mac 中,我正在使用 PKCS 文件 (.p12),并且能够以 SSL 模式访问 Kafka 集群。但是,在 Dataproc 中 - 预期的文件格式似乎是 JKS。
这是我用来将 .p12 文件转换为 JKS 格式的命令:
keytool -importkeystore -srckeystore dataproc-versa-sase.p12 -srcstoretype pkcs12 -srcalias versa-sase-user -destkeystore dataproc-versa-sase-p12-1.jks -deststoretype jks -deststorepass <password> -destalias versa-sase-user
需要做什么来解决这个问题? Spark 程序似乎无法访问 JKS 文件?
蒂亚!
如果你想使用 jks,我会添加以下选项
.option("kafka.ssl.keystore.type", "JKS")
.option("kafka.ssl.truststore.type", "JKS")
顺便说一句,这也适用于 PKCS12
.option("kafka.ssl.keystore.type", "PKCS12")
.option("kafka.ssl.truststore.type", "PKCS12")
就像之前有人提到的那样,您可以检查它是否是 jdk 兼容性问题,这样做如下:
keytool -v -list -storetype pkcs12 -keystore kafka-client-jdk8-truststore.p12
如果您收到一条显示密钥库的消息,那么您是安全的,但如果您收到一条消息,指出找不到标识符,则意味着 jdk 秒内存在差异。
根据@OneCricketer 的说明,我能够通过使用 --files <gs://cert1>,<gs://cert2>
来实现它。
此外,这在使用集群模式时有效。
ClusterMode 命令
gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,spark.submit.deployMode=cluster --region us-central1
客户端模式:
gcloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb2-v2.py --cluster dataproc-ss-poc --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --region us-central1
正在访问驱动程序中的证书:
# access using the cert name
ssl_truststore_location="ca.p12"
ssl_keystore_location="dataproc-versa-sase.p12"
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10) \
.load()