PySpark-streaming:如何访问使用 --files 发送的文件

PySpark-streaming: How to access files sent using --files

我是 运行 一个使用 kafka 的 pyspark-streaming 客户端。我想发送文件到集群。
我正在使用 --files 选项:

spark-submit --master yarn \
--deploy-mode client \
--jars "/home/aiman/testing_aiman/spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar" \
--files /home/aiman/testing_aiman/kafka.keystore.uat.jks#keystore.jks,/home/aiman/testing_aiman/kafka.truststore.uat.jks#truststore.jks \
sparkStreamingTest.py

并尝试使用 SparkFiles.get():

访问文件
from pyspark.sql import SparkSession
from pyspark import SparkFiles


spark = SparkSession.builder.appName("Test Streaming").getOrCreate()

# Get the Keystore File and Truststore File
keystore = str(SparkFiles.get('keystore.jks'))
truststore = str(SparkFiles.get('truststore.jks'))

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers","kafka.server.com:9093") \
    .option("subscribe","TEST_TOPIC") \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.keystore.location", keystore) \
    .option("kafka.ssl.keystore.password", "abcd") \
    .option("kafka.ssl.key.password","abcd") \
    .option("kafka.ssl.truststore.type","JKS") \
    .option("kafka.ssl.truststore.location", truststore) \
    .option("kafka.ssl.truststore.password","abcd") \
    .option("kafka.ssl.enabled.protocols","TLSv1") \
    .option("kafka.ssl.endpoint.identification.algorithm","") \
    .load()
....
...

但我仍然得到 NoSuchFileException:

Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/spark-4578a498-f96d-4c8a-a716-e128d90531fb/userFiles-5792bc5c-d513-4aa3-9014-26df66ace1db/keystore.jks of type JKS
        at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:357)
        at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:240)
        at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:141)
        ... 55 more
Caused by: java.nio.file.NoSuchFileException: /tmp/spark-4578a498-f96d-4c8a-a716-e128d90531fb/userFiles-5792bc5c-d513-4aa3-9014-26df66ace1db/keystore.jks
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
        at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
        at java.nio.file.Files.newByteChannel(Files.java:361)
        at java.nio.file.Files.newByteChannel(Files.java:407)
        at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
        at java.nio.file.Files.newInputStream(Files.java:152)
        at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:350)
        ... 57 more

我哪里错了?

不使用SparkFiles.get()方法获取绝对路径,而是直接使用文件名,同时从--files选项中删除#keystore.jks#truststore.jks spark-submit命令:

spark-submit --master yarn \
--deploy-mode client \
--jars "/home/aiman/testing_aiman/spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar" \
--files /home/aiman/testing_aiman/kafka.keystore.uat.jks,/home/aiman/testing_aiman/kafka.truststore.uat.jks \
sparkStreamingTest.py

使用文件实际文件名:

#Commenting the SparkFiles.get() method
#keystore = str(SparkFiles.get('keystore.jks'))
#truststore = str(SparkFiles.get('truststore.jks'))

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers","kafka.server.com:9093") \
    .option("subscribe","TEST_TOPIC") \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.keystore.location", "kafka.keystore.uat.jks") \
    .option("kafka.ssl.keystore.password", "abcd") \
    .option("kafka.ssl.key.password","abcd") \
    .option("kafka.ssl.truststore.type","JKS") \
    .option("kafka.ssl.truststore.location", "kafka.truststore.uat.jks") \
    .option("kafka.ssl.truststore.password","abcd") \
    .option("kafka.ssl.enabled.protocols","TLSv1") \
    .option("kafka.ssl.endpoint.identification.algorithm","") \
    .load()