Azure databricks dataframe 写入给出作业中止错误
Azure databricks dataframe write gives job abort error
我正在尝试将数据写入 csv 文件并将该文件存储在 Azure Data Lake Gen2 上,并将 运行 写入作业中止错误消息中。同样的代码以前可以正常工作。
错误信息:
org.apache.spark.SparkException: Job aborted.
代码:
import requests
response = requests.get('https://myapiurl.com/v1/data', auth=('user', 'password'))
data = response.json()
from pyspark.sql import *
df=spark.createDataFrame([Row(**i) for i in data])
df.write.format(source).mode("overwrite").save(path) #error line
我在下面总结了解决方案
如果您想访问 Azure databricks 中的 Azure data lake gen2,您有两种选择。
将 Azure data lake gen2 挂载为 Azure databricks 的文件系统。完成后,就可以读写路径为/mnt/<>
的文件了。我们只需要 运行 代码一次。
一个。创建服务主体并将 Storage Blob Data Contributor 分配给 Data Lake Storage Gen2 存储帐户范围内的 sp
az login
az ad sp create-for-rbac -n "MyApp" --role "Storage Blob Data Contributor" \
--scopes /subscriptions/<subscription>/resourceGroups/<resource-group>/providers/Microsoft.Storage/storageAccounts/<storage-account>
b。代码
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<appId>",
"fs.azure.account.oauth2.client.secret": "<clientSecret>",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token",
"fs.azure.createRemoteFileSystemDuringInitialization": "true"}
dbutils.fs.mount(
source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/folder1",
mount_point = "/mnt/flightdata",
extra_configs = configs)
直接使用存储帐户访问密钥访问。
我们可以将代码 spark.conf.set( "fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-name>")
添加到我们的脚本中。然后我们就可以读写路径为abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/
.
的文件了
例如
from pyspark.sql.types import StringType
spark.conf.set(
"fs.azure.account.key.testadls05.dfs.core.windows.net", "<account access key>")
df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()
df.coalesce(1).write.format('csv').option('header', True).mode('overwrite').save('abfss://test@testadls05.dfs.core.windows.net/result_csv')
详情请参考here
我正在尝试将数据写入 csv 文件并将该文件存储在 Azure Data Lake Gen2 上,并将 运行 写入作业中止错误消息中。同样的代码以前可以正常工作。
错误信息:
org.apache.spark.SparkException: Job aborted.
代码:
import requests
response = requests.get('https://myapiurl.com/v1/data', auth=('user', 'password'))
data = response.json()
from pyspark.sql import *
df=spark.createDataFrame([Row(**i) for i in data])
df.write.format(source).mode("overwrite").save(path) #error line
我在下面总结了解决方案
如果您想访问 Azure databricks 中的 Azure data lake gen2,您有两种选择。
将 Azure data lake gen2 挂载为 Azure databricks 的文件系统。完成后,就可以读写路径为
/mnt/<>
的文件了。我们只需要 运行 代码一次。一个。创建服务主体并将 Storage Blob Data Contributor 分配给 Data Lake Storage Gen2 存储帐户范围内的 sp
az login az ad sp create-for-rbac -n "MyApp" --role "Storage Blob Data Contributor" \ --scopes /subscriptions/<subscription>/resourceGroups/<resource-group>/providers/Microsoft.Storage/storageAccounts/<storage-account>
b。代码
configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": "<appId>", "fs.azure.account.oauth2.client.secret": "<clientSecret>", "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token", "fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/folder1", mount_point = "/mnt/flightdata", extra_configs = configs)
直接使用存储帐户访问密钥访问。
我们可以将代码
的文件了spark.conf.set( "fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-name>")
添加到我们的脚本中。然后我们就可以读写路径为abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/
.例如
from pyspark.sql.types import StringType spark.conf.set( "fs.azure.account.key.testadls05.dfs.core.windows.net", "<account access key>") df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age") df.show() df.coalesce(1).write.format('csv').option('header', True).mode('overwrite').save('abfss://test@testadls05.dfs.core.windows.net/result_csv')
详情请参考here