PySpark-streaming:如何访问使用 --files 发送的文件
PySpark-streaming: How to access files sent using --files
我是 运行 一个使用 kafka 的 pyspark-streaming 客户端。我想发送文件到集群。
我正在使用 --files
选项:
spark-submit --master yarn \
--deploy-mode client \
--jars "/home/aiman/testing_aiman/spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar" \
--files /home/aiman/testing_aiman/kafka.keystore.uat.jks#keystore.jks,/home/aiman/testing_aiman/kafka.truststore.uat.jks#truststore.jks \
sparkStreamingTest.py
并尝试使用 SparkFiles.get()
:
访问文件
from pyspark.sql import SparkSession
from pyspark import SparkFiles
spark = SparkSession.builder.appName("Test Streaming").getOrCreate()
# Get the Keystore File and Truststore File
keystore = str(SparkFiles.get('keystore.jks'))
truststore = str(SparkFiles.get('truststore.jks'))
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","kafka.server.com:9093") \
.option("subscribe","TEST_TOPIC") \
.option("startingOffsets", "earliest") \
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.keystore.location", keystore) \
.option("kafka.ssl.keystore.password", "abcd") \
.option("kafka.ssl.key.password","abcd") \
.option("kafka.ssl.truststore.type","JKS") \
.option("kafka.ssl.truststore.location", truststore) \
.option("kafka.ssl.truststore.password","abcd") \
.option("kafka.ssl.enabled.protocols","TLSv1") \
.option("kafka.ssl.endpoint.identification.algorithm","") \
.load()
....
...
但我仍然得到 NoSuchFileException
:
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/spark-4578a498-f96d-4c8a-a716-e128d90531fb/userFiles-5792bc5c-d513-4aa3-9014-26df66ace1db/keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:357)
at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:240)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:141)
... 55 more
Caused by: java.nio.file.NoSuchFileException: /tmp/spark-4578a498-f96d-4c8a-a716-e128d90531fb/userFiles-5792bc5c-d513-4aa3-9014-26df66ace1db/keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:350)
... 57 more
我哪里错了?
不使用SparkFiles.get()
方法获取绝对路径,而是直接使用文件名,同时从--files
选项中删除#keystore.jks
和#truststore.jks
spark-submit
命令:
spark-submit --master yarn \
--deploy-mode client \
--jars "/home/aiman/testing_aiman/spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar" \
--files /home/aiman/testing_aiman/kafka.keystore.uat.jks,/home/aiman/testing_aiman/kafka.truststore.uat.jks \
sparkStreamingTest.py
使用文件实际文件名:
#Commenting the SparkFiles.get() method
#keystore = str(SparkFiles.get('keystore.jks'))
#truststore = str(SparkFiles.get('truststore.jks'))
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","kafka.server.com:9093") \
.option("subscribe","TEST_TOPIC") \
.option("startingOffsets", "earliest") \
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.keystore.location", "kafka.keystore.uat.jks") \
.option("kafka.ssl.keystore.password", "abcd") \
.option("kafka.ssl.key.password","abcd") \
.option("kafka.ssl.truststore.type","JKS") \
.option("kafka.ssl.truststore.location", "kafka.truststore.uat.jks") \
.option("kafka.ssl.truststore.password","abcd") \
.option("kafka.ssl.enabled.protocols","TLSv1") \
.option("kafka.ssl.endpoint.identification.algorithm","") \
.load()
我是 运行 一个使用 kafka 的 pyspark-streaming 客户端。我想发送文件到集群。
我正在使用 --files
选项:
spark-submit --master yarn \
--deploy-mode client \
--jars "/home/aiman/testing_aiman/spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar" \
--files /home/aiman/testing_aiman/kafka.keystore.uat.jks#keystore.jks,/home/aiman/testing_aiman/kafka.truststore.uat.jks#truststore.jks \
sparkStreamingTest.py
并尝试使用 SparkFiles.get()
:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
spark = SparkSession.builder.appName("Test Streaming").getOrCreate()
# Get the Keystore File and Truststore File
keystore = str(SparkFiles.get('keystore.jks'))
truststore = str(SparkFiles.get('truststore.jks'))
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","kafka.server.com:9093") \
.option("subscribe","TEST_TOPIC") \
.option("startingOffsets", "earliest") \
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.keystore.location", keystore) \
.option("kafka.ssl.keystore.password", "abcd") \
.option("kafka.ssl.key.password","abcd") \
.option("kafka.ssl.truststore.type","JKS") \
.option("kafka.ssl.truststore.location", truststore) \
.option("kafka.ssl.truststore.password","abcd") \
.option("kafka.ssl.enabled.protocols","TLSv1") \
.option("kafka.ssl.endpoint.identification.algorithm","") \
.load()
....
...
但我仍然得到 NoSuchFileException
:
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/spark-4578a498-f96d-4c8a-a716-e128d90531fb/userFiles-5792bc5c-d513-4aa3-9014-26df66ace1db/keystore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:357)
at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:240)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:141)
... 55 more
Caused by: java.nio.file.NoSuchFileException: /tmp/spark-4578a498-f96d-4c8a-a716-e128d90531fb/userFiles-5792bc5c-d513-4aa3-9014-26df66ace1db/keystore.jks
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:350)
... 57 more
我哪里错了?
不使用SparkFiles.get()
方法获取绝对路径,而是直接使用文件名,同时从--files
选项中删除#keystore.jks
和#truststore.jks
spark-submit
命令:
spark-submit --master yarn \
--deploy-mode client \
--jars "/home/aiman/testing_aiman/spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar" \
--files /home/aiman/testing_aiman/kafka.keystore.uat.jks,/home/aiman/testing_aiman/kafka.truststore.uat.jks \
sparkStreamingTest.py
使用文件实际文件名:
#Commenting the SparkFiles.get() method
#keystore = str(SparkFiles.get('keystore.jks'))
#truststore = str(SparkFiles.get('truststore.jks'))
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","kafka.server.com:9093") \
.option("subscribe","TEST_TOPIC") \
.option("startingOffsets", "earliest") \
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.keystore.location", "kafka.keystore.uat.jks") \
.option("kafka.ssl.keystore.password", "abcd") \
.option("kafka.ssl.key.password","abcd") \
.option("kafka.ssl.truststore.type","JKS") \
.option("kafka.ssl.truststore.location", "kafka.truststore.uat.jks") \
.option("kafka.ssl.truststore.password","abcd") \
.option("kafka.ssl.enabled.protocols","TLSv1") \
.option("kafka.ssl.endpoint.identification.algorithm","") \
.load()