java.io.FileNotFoundException: 找不到 cos://mybucket.myservicename/checkpoint/offsets

java.io.FileNotFoundException: Not found cos://mybucket.myservicename/checkpoint/offsets

我正在尝试使用 Spark Structured Streaming 2.3 从 Kafka (IBM Message Hub) 读取数据并将其保存到 1.1 IBM Analytics Engine 集群上的 IBM Cloud Object Storage。

创建集群后,ssh 进入:

$ ssh clsadmin@myclusterid.bi.services.eu-gb.bluemix.net

创建 spark 与 Message Hub 对话所需的 jaas.conf 文件:

$ cat << EOF > jaas.conf
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="kafka"
    username="<<MY_MESSAGEHUB_USERNAME>>"
    password="<<MY_MESSAGEHUB_PASSWORD>>";
};
EOF

这将在集群的 /home/wce/clsadmin 目录中创建一个文件 jaas.conf

创建一个实用程序脚本来启动 spark shell(现在我们只有一个执行程序):

$ cat << EOF > start_spark.sh
spark-shell --master local[1] \
       --files jaas.conf \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
       --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
       --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
       --num-executors 1 --executor-cores 1 
EOF
$ chmod +x start_spark.sh

使用实用程序脚本启动 spark 会话:

$ ./start_spark.sh

现在在 spark shell 中读取 Kafka(消息中心)流。确保更改 kafka.bootstrap.servers 以匹配您的服务凭据:

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("kafka.security.protocol", "SASL_SSL").
                option("kafka.sasl.mechanism", "PLAIN").
                option("kafka.ssl.protocol", "TLSv1.2").
                option("kafka.ssl.enabled.protocols", "TLSv1.2").
                load()

我们可以测试我们的连接是否正常:

df.writeStream.format("console").start()

一段时间后,您应该会看到一些数据打印到控制台,例如

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+--------------------+--------------------+-----------------+---------+------+--------------------+-------------+
|                 key|               value|            topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+-----------------+---------+------+--------------------+-------------+
|[35 34 30 33 36 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84874|2018-08-22 15:42:...|            0|
|[35 34 30 33 36 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84875|2018-08-22 15:42:...|            0|
|[35 34 30 38 33 3...|[7B 22 49 6E 76 6...|transactions_load|        7| 84876|2018-08-22 15:42:...|            0|
...

设置 spark 会话,使其可以访问 COS 实例:

val accessKey = "MY_COS_ACCESS_KEY"
val secretKey = "MY_COS_SECRET_KEY"
val bucketName = "streamingdata"

// arbitrary name for refering to the cos settings from this code
val serviceName = "myservicename"

sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.access.key", accessKey)
sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.secret.key", secretKey)
sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.endpoint", "s3.eu-geo.objectstorage.service.networklayer.com")

我们可以通过编写一个虚拟文件来测试 COS 是否已设置:

import spark.implicits._

val data = sc.parallelize(Array(1,2,3,4,5))
data.toDF.write.format("csv").save(s"cos://${bucketName}.${serviceName}/data.txt")

spark.read.csv(s"cos://${bucketName}.${serviceName}/data.txt").collect()

如果读写 COS 成功,上面的测试应该输出如下内容:

res7: Array[org.apache.spark.sql.Row] = Array([1], [2], [3], [4], [5])

现在尝试将流数据帧写入 COS:

df.
  writeStream.
  format("parquet").
  option("checkpointLocation", s"cos://${bucketName}.${serviceName}/checkpoint").
  option("path",               s"cos://${bucketName}.${serviceName}/data").
  start()

对我来说,这失败了:

scala> 18/08/22 15:43:06 WARN COSAPIClient: file status checkpoint/offsets returned 404
18/08/22 15:43:06 ERROR MicroBatchExecution: Query [id = 78c8c4af-f21d-457d-b5a7-56559e180634, runId = 50e8759e-0293-4fab-9b73-dd4811423b37] terminated with error
java.io.FileNotFoundException: Not found cos://streamingdata.myservicename/checkpoint/offsets
    at com.ibm.stocator.fs.cos.COSAPIClient.getFileStatus(COSAPIClient.java:628)
    at com.ibm.stocator.fs.ObjectStoreFileSystem.getFileStatus(ObjectStoreFileSystem.java:486)
    at com.ibm.stocator.fs.ObjectStoreFileSystem.listStatus(ObjectStoreFileSystem.java:360)
    at com.ibm.stocator.fs.ObjectStoreFileSystem.listStatus(ObjectStoreFileSystem.java:336)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileSystemManager.list(HDFSMetadataLog.scala:412)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:180)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:124)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:189)

这是 stocator 或 Spark Structured Streaming 的问题吗?

更改为 S3AFileSystem 似乎已解决问题:

sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sc.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
sc.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-geo.objectstorage.service.networklayer.com")

val s3Url = s"s3a://${bucketName}/"

...

df.
  writeStream.
  format("parquet").
  option("checkpointLocation", s"${s3Url}/checkpoint").
  option("path",               s"${s3Url}/data").
  start()

看来这个问题与 stocator 驱动程序有关。


2018 年 8 月 23 日更新:此问题已在 Stocator v1.0.24 中修复,但 IBM Analytics Engine 上的 stocator 尚未更新到此版本。