Azure 数据工厂 V2 - 输入和输出
Azure Data Factory V2 - Input and Output
我正在尝试根据以下 github 存储库重现以下架构:https://github.com/Azure/cortana-intelligence-price-optimization
问题是链接到 ADF 的部分,因为在指南中它使用旧版本的 ADF:我不知道如何在 ADF v2 中映射 "input" 和 "output"单个 activity 的属性,以便它们指向数据集。
管道执行一个 spark activity,它只是执行一个 python 脚本,然后我认为它应该将数据写入我已经定义的数据集中。
这里是指南中ADF V1管道的json,我无法复制:
"activities": [
{
"type": "HDInsightSpark",
"typeProperties": {
"rootPath": "adflibs",
"entryFilePath": "Sales_Data_Aggregation_2.0_blob.py",
"arguments": [ "modelsample" ],
"getDebugInfo": "Always"
},
"outputs": [
{
"name": "BlobStoreAggOutput"
}
],
"policy": {
"timeout": "00:30:00",
"concurrency": 1,
"retry": 1
},
"scheduler": {
"frequency": "Hour",
"interval": 1
},
"name": "AggDataSparkJob",
"description": "Submits a Spark Job",
"linkedServiceName": "HDInsightLinkedService"
},
数据工厂管道中的 Spark activity 在您自己的或按需的 HDInsight 群集上执行 Spark 程序。这篇 article 以数据转换活动文章为基础,该文章概述了数据转换和支持的转换活动。当您使用按需 Spark 链接服务时,数据工厂会自动为您即时创建一个 Spark 集群来处理数据,然后在处理完成后删除该集群。
上传 "Sales_Data_Aggregation_2.0_blob.py" 到附加到 HDInsight 集群的存储帐户并修改 spark 的示例定义 activity 并创建计划触发器和 运行 代码:
这是 Spark JSON 定义的示例 activity:
{
"name": "Spark Activity",
"description": "Description",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyHDInsightLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"sparkJobLinkedService": {
"referenceName": "MyAzureStorageLinkedService",
"type": "LinkedServiceReference"
},
"rootPath": "adfspark",
"entryFilePath": "test.py",
"sparkConfig": {
"ConfigItem1": "Value"
},
"getDebugInfo": "Failure",
"arguments": [
"SampleHadoopJobArgument1"
]
}
}
希望对您有所帮助。
我正在尝试根据以下 github 存储库重现以下架构:https://github.com/Azure/cortana-intelligence-price-optimization
问题是链接到 ADF 的部分,因为在指南中它使用旧版本的 ADF:我不知道如何在 ADF v2 中映射 "input" 和 "output"单个 activity 的属性,以便它们指向数据集。
管道执行一个 spark activity,它只是执行一个 python 脚本,然后我认为它应该将数据写入我已经定义的数据集中。
这里是指南中ADF V1管道的json,我无法复制:
"activities": [
{
"type": "HDInsightSpark",
"typeProperties": {
"rootPath": "adflibs",
"entryFilePath": "Sales_Data_Aggregation_2.0_blob.py",
"arguments": [ "modelsample" ],
"getDebugInfo": "Always"
},
"outputs": [
{
"name": "BlobStoreAggOutput"
}
],
"policy": {
"timeout": "00:30:00",
"concurrency": 1,
"retry": 1
},
"scheduler": {
"frequency": "Hour",
"interval": 1
},
"name": "AggDataSparkJob",
"description": "Submits a Spark Job",
"linkedServiceName": "HDInsightLinkedService"
},
数据工厂管道中的 Spark activity 在您自己的或按需的 HDInsight 群集上执行 Spark 程序。这篇 article 以数据转换活动文章为基础,该文章概述了数据转换和支持的转换活动。当您使用按需 Spark 链接服务时,数据工厂会自动为您即时创建一个 Spark 集群来处理数据,然后在处理完成后删除该集群。
上传 "Sales_Data_Aggregation_2.0_blob.py" 到附加到 HDInsight 集群的存储帐户并修改 spark 的示例定义 activity 并创建计划触发器和 运行 代码:
这是 Spark JSON 定义的示例 activity:
{
"name": "Spark Activity",
"description": "Description",
"type": "HDInsightSpark",
"linkedServiceName": {
"referenceName": "MyHDInsightLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"sparkJobLinkedService": {
"referenceName": "MyAzureStorageLinkedService",
"type": "LinkedServiceReference"
},
"rootPath": "adfspark",
"entryFilePath": "test.py",
"sparkConfig": {
"ConfigItem1": "Value"
},
"getDebugInfo": "Failure",
"arguments": [
"SampleHadoopJobArgument1"
]
}
}
希望对您有所帮助。