如何使用 Spark-Scala 本地程序从 Google 云存储中读取简单的文本文件
How to read simple text file from Google Cloud Storage using Spark-Scala local Program
如下博客所示,
我正在尝试使用 Spark-scala 从 Google 云存储中读取文件。
为此,我导入了 Google Cloud Storage Connector 和 Google Cloud Storage,如下所示,
// https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage
compile group: 'com.google.cloud', name: 'google-cloud-storage', version: '0.7.0'
// https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector
compile group: 'com.google.cloud.bigdataoss', name: 'gcs-connector', version: '1.6.0-hadoop2'
之后创建了一个简单的 scala 对象文件,如下所示,
(创建了一个 sparkSession)
val csvData = spark.read.csv("gs://my-bucket/project-data/csv")
但它抛出以下错误,
17/03/01 20:16:02 INFO GoogleHadoopFileSystemBase: GHFS version: 1.6.0-hadoop2
17/03/01 20:16:23 WARN HttpTransport: exception thrown while executing request
java.net.SocketTimeoutException: connect timed out
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
at com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:158)
at com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:489)
at com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:205)
at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:70)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1816)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:1003)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:966)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:88)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
at org.apache.spark.sql.execution.datasources.DataSource.hasMetadata(DataSource.scala:317)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:413)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:349)
at test$.main(test.scala:41)
at test.main(test.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
我也设置了所有的身份验证。不确定超时是如何闪烁的。
编辑
我正在尝试通过 IntelliJ Idea (Windows) 运行 以上代码。
相同代码的 JAR 文件在 Google Cloud DataProc 上运行良好,但当我通过本地系统 运行 时出现上述错误。
我已经在 IntelliJ 中安装了 Spark、Scala、Google Cloud 插件。
还有一件事,
我创建了 Dataproc 实例并尝试连接到文档中给出的外部 IP 地址,
https://cloud.google.com/compute/docs/instances/connecting-to-instance#standardssh
无法连接到服务器,出现超时错误
您需要将 google.cloud.auth.service.account.json.keyfile
设置为您在 these instructions for generating a private key 之后创建的服务帐户的 json 凭据文件的本地路径。堆栈跟踪显示连接器认为它在 GCE VM 上,并试图从本地元数据服务器获取凭证。如果这不起作用,请尝试设置 fs.gs.auth.service.account.json.keyfile
。
尝试 SSH 时,您是否尝试过 gcloud compute ssh <instance name>
?您可能还需要检查您的 Compute Engine 防火墙规则,以确保您允许端口 22 上的入站连接。
感谢 Dennis 指出问题的方向。由于我使用的是 Windows OS,因此没有 core-site.xml 因为 hadoop 不适用于 windows.
我已经下载了预构建的 spark 并在代码中配置了您提到的参数,如下所示
创建了一个 SparkSession
并使用其变量配置的 hadoop 参数,如 spark.SparkContext.hadoopConfiguration.set("google.cloud.auth.service.account.json.keyfile","<KeyFile Path>")
和我们需要在核心中设置的所有其他参数-site.xml.
设置所有这些后,程序可以访问 Google 云存储中的文件。
如下博客所示,
我正在尝试使用 Spark-scala 从 Google 云存储中读取文件。 为此,我导入了 Google Cloud Storage Connector 和 Google Cloud Storage,如下所示,
// https://mvnrepository.com/artifact/com.google.cloud/google-cloud-storage
compile group: 'com.google.cloud', name: 'google-cloud-storage', version: '0.7.0'
// https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector
compile group: 'com.google.cloud.bigdataoss', name: 'gcs-connector', version: '1.6.0-hadoop2'
之后创建了一个简单的 scala 对象文件,如下所示, (创建了一个 sparkSession)
val csvData = spark.read.csv("gs://my-bucket/project-data/csv")
但它抛出以下错误,
17/03/01 20:16:02 INFO GoogleHadoopFileSystemBase: GHFS version: 1.6.0-hadoop2
17/03/01 20:16:23 WARN HttpTransport: exception thrown while executing request
java.net.SocketTimeoutException: connect timed out
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
at com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:158)
at com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:489)
at com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:205)
at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:70)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1816)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:1003)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:966)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:88)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
at org.apache.spark.sql.execution.datasources.DataSource.hasMetadata(DataSource.scala:317)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:413)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:349)
at test$.main(test.scala:41)
at test.main(test.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
我也设置了所有的身份验证。不确定超时是如何闪烁的。
编辑
我正在尝试通过 IntelliJ Idea (Windows) 运行 以上代码。 相同代码的 JAR 文件在 Google Cloud DataProc 上运行良好,但当我通过本地系统 运行 时出现上述错误。 我已经在 IntelliJ 中安装了 Spark、Scala、Google Cloud 插件。
还有一件事, 我创建了 Dataproc 实例并尝试连接到文档中给出的外部 IP 地址, https://cloud.google.com/compute/docs/instances/connecting-to-instance#standardssh
无法连接到服务器,出现超时错误
您需要将 google.cloud.auth.service.account.json.keyfile
设置为您在 these instructions for generating a private key 之后创建的服务帐户的 json 凭据文件的本地路径。堆栈跟踪显示连接器认为它在 GCE VM 上,并试图从本地元数据服务器获取凭证。如果这不起作用,请尝试设置 fs.gs.auth.service.account.json.keyfile
。
尝试 SSH 时,您是否尝试过 gcloud compute ssh <instance name>
?您可能还需要检查您的 Compute Engine 防火墙规则,以确保您允许端口 22 上的入站连接。
感谢 Dennis 指出问题的方向。由于我使用的是 Windows OS,因此没有 core-site.xml 因为 hadoop 不适用于 windows.
我已经下载了预构建的 spark 并在代码中配置了您提到的参数,如下所示
创建了一个 SparkSession
并使用其变量配置的 hadoop 参数,如 spark.SparkContext.hadoopConfiguration.set("google.cloud.auth.service.account.json.keyfile","<KeyFile Path>")
和我们需要在核心中设置的所有其他参数-site.xml.
设置所有这些后,程序可以访问 Google 云存储中的文件。