将参数传递给 Azure 数据工厂中的 Pyspark 脚本时出错
Error while passing arguments to Pyspark Script in Azure data Factory
我是 运行 来自 Azure 数据工厂的 PySpark 脚本。
我已经在 Script/Jar 下的给定部分中提到了如下参数。
参数是键值对。
参数提交正常,如下所示。
--arg '--APP_NAME ABC' --arg '--CONFIG_FILE_PATH wasbs://ABC --arg '--OUTPUT_INFO wasbs://XYZ
执行管道时出现以下错误。
usage: Data.py [-h] --CONFIG_FILE_PATH CONFIG_FILE_PATH --OUTPUT_INFO
OUTPUT_INFO --ACTION_CODE ACTION_CODE --RUN_ID RUN_ID
--APP_NAME APP_NAME --JOB_ID JOB_ID --TASK_ID TASK_ID
--PCS_ID PCS_ID --DAG_ID DAG_ID
Data.py: error: argument --CONFIG_FILE_PATH is required.
您可以将参数传递给 Azure 数据工厂中的 Pyspark 脚本。
代码:
{
"name": "SparkActivity",
"properties": {
"activities": [
{
"name": "Spark1",
"type": "HDInsightSpark",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"rootPath": "adftutorial/spark/script",
"entryFilePath": "WordCount_Spark.py",
"arguments": [
"--input-file",
"wasb://sampledata@chepra.blob.core.windows.net/data",
"--output-file",
"wasb://sampledata@chepra.blob.core.windows.net/results"
],
"sparkJobLinkedService": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
}
},
"linkedServiceName": {
"referenceName": "HDInsight",
"type": "LinkedServiceReference"
}
}
],
"annotations": []
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
在 ADF 中传递参数的 WALKTHROUGH:
在Azure Data Factory中传递参数的一些例子:
{
"name": "SparkSubmit",
"properties": {
"description": "Submit a spark job",
"activities": [
{
"type": "HDInsightMapReduce",
"typeProperties": {
"className": "com.adf.spark.SparkJob",
"jarFilePath": "libs/spark-adf-job-bin.jar",
"jarLinkedService": "StorageLinkedService",
"arguments": [
"--jarFile",
"libs/sparkdemoapp_2.10-1.0.jar",
"--jars",
"/usr/hdp/current/hadoop-client/hadoop-azure-2.7.1.2.3.3.0-3039.jar,/usr/hdp/current/hadoop-client/lib/azure-storage-2.2.0.jar",
"--mainClass",
"com.adf.spark.demo.Demo",
"--master",
"yarn-cluster",
"--driverMemory",
"2g",
"--driverExtraClasspath",
"/usr/lib/hdinsight-logging/*",
"--executorCores",
"1",
"--executorMemory",
"4g",
"--sparkHome",
"/usr/hdp/current/spark-client",
"--connectionString",
"DefaultEndpointsProtocol=https;AccountName=<YOUR_ACCOUNT>;AccountKey=<YOUR_KEY>",
"input=wasb://input@<YOUR_ACCOUNT>.blob.core.windows.net/data",
"output=wasb://output@<YOUR_ACCOUNT>.blob.core.windows.net/results"
]
},
"inputs": [
{
"name": "input"
}
],
"outputs": [
{
"name": "output"
}
],
"policy": {
"executionPriorityOrder": "OldestFirst",
"timeout": "01:00:00",
"concurrency": 1,
"retry": 1
},
"scheduler": {
"frequency": "Day",
"interval": 1
},
"name": "Spark Launcher",
"description": "Submits a Spark Job",
"linkedServiceName": "HDInsightLinkedService"
}
],
"start": "2015-11-16T00:00:01Z",
"end": "2015-11-16T23:59:00Z",
"isPaused": false,
"pipelineMode": "Scheduled"
}
}
我是 运行 来自 Azure 数据工厂的 PySpark 脚本。 我已经在 Script/Jar 下的给定部分中提到了如下参数。
参数是键值对。 参数提交正常,如下所示。
--arg '--APP_NAME ABC' --arg '--CONFIG_FILE_PATH wasbs://ABC --arg '--OUTPUT_INFO wasbs://XYZ
执行管道时出现以下错误。
usage: Data.py [-h] --CONFIG_FILE_PATH CONFIG_FILE_PATH --OUTPUT_INFO
OUTPUT_INFO --ACTION_CODE ACTION_CODE --RUN_ID RUN_ID
--APP_NAME APP_NAME --JOB_ID JOB_ID --TASK_ID TASK_ID
--PCS_ID PCS_ID --DAG_ID DAG_ID
Data.py: error: argument --CONFIG_FILE_PATH is required.
您可以将参数传递给 Azure 数据工厂中的 Pyspark 脚本。
代码:
{
"name": "SparkActivity",
"properties": {
"activities": [
{
"name": "Spark1",
"type": "HDInsightSpark",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"rootPath": "adftutorial/spark/script",
"entryFilePath": "WordCount_Spark.py",
"arguments": [
"--input-file",
"wasb://sampledata@chepra.blob.core.windows.net/data",
"--output-file",
"wasb://sampledata@chepra.blob.core.windows.net/results"
],
"sparkJobLinkedService": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
}
},
"linkedServiceName": {
"referenceName": "HDInsight",
"type": "LinkedServiceReference"
}
}
],
"annotations": []
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
在 ADF 中传递参数的 WALKTHROUGH:
在Azure Data Factory中传递参数的一些例子:
{
"name": "SparkSubmit",
"properties": {
"description": "Submit a spark job",
"activities": [
{
"type": "HDInsightMapReduce",
"typeProperties": {
"className": "com.adf.spark.SparkJob",
"jarFilePath": "libs/spark-adf-job-bin.jar",
"jarLinkedService": "StorageLinkedService",
"arguments": [
"--jarFile",
"libs/sparkdemoapp_2.10-1.0.jar",
"--jars",
"/usr/hdp/current/hadoop-client/hadoop-azure-2.7.1.2.3.3.0-3039.jar,/usr/hdp/current/hadoop-client/lib/azure-storage-2.2.0.jar",
"--mainClass",
"com.adf.spark.demo.Demo",
"--master",
"yarn-cluster",
"--driverMemory",
"2g",
"--driverExtraClasspath",
"/usr/lib/hdinsight-logging/*",
"--executorCores",
"1",
"--executorMemory",
"4g",
"--sparkHome",
"/usr/hdp/current/spark-client",
"--connectionString",
"DefaultEndpointsProtocol=https;AccountName=<YOUR_ACCOUNT>;AccountKey=<YOUR_KEY>",
"input=wasb://input@<YOUR_ACCOUNT>.blob.core.windows.net/data",
"output=wasb://output@<YOUR_ACCOUNT>.blob.core.windows.net/results"
]
},
"inputs": [
{
"name": "input"
}
],
"outputs": [
{
"name": "output"
}
],
"policy": {
"executionPriorityOrder": "OldestFirst",
"timeout": "01:00:00",
"concurrency": 1,
"retry": 1
},
"scheduler": {
"frequency": "Day",
"interval": 1
},
"name": "Spark Launcher",
"description": "Submits a Spark Job",
"linkedServiceName": "HDInsightLinkedService"
}
],
"start": "2015-11-16T00:00:01Z",
"end": "2015-11-16T23:59:00Z",
"isPaused": false,
"pipelineMode": "Scheduled"
}
}