Azure 数据工厂副本
Azure Data Factory Copy
我在管道中有一个 Azure 数据工厂副本 Activity。副本 activity 正在运行 - 但数据被复制了多次。我的数据源是 Azure NoSQL DB。如何将复制 Activity 配置为不重新复制记录?
这是我的Activity
{
"name": "Copy Usage Session Data",
"properties":
{
"description": "",
"activities":
[
{
"type": "Copy",
"typeProperties":
{
"source": {"type": "DocumentDbCollectionSource"},
"sink":
{
"type": "SqlSink",
"writeBatchSize": 0,
"writeBatchTimeout": "05:00:00",
"sliceIdentifierColumnName": "InstallationSliceIdentifier"
},
"translator":
{
"type": "TabularTranslator",
"ColumnMappings": "machineKey: machineKey, product: product, softwareVersion: softwareVersion, id: DocumentDBId"
}
},
"inputs": [{"name": "Machine Registration Input Data"}],
"outputs": [{"name": "Machine Registration Output Data"}],
"policy":
{
"timeout": "01:00:00",
"concurrency": 1,
"executionPriorityOrder": "OldestFirst"
},
"scheduler":
{
"frequency": "Hour",
"interval": 1
},
"name": "Machine Registration Data To History",
"description": "Copy Machine Registration Data To SQL Server DB Activity"
},
{
"type": "Copy",
"typeProperties":
{
"source": {"type": "DocumentDbCollectionSource"},
"sink":
{
"type": "SqlSink",
"writeBatchSize": 0,
"writeBatchTimeout": "05:00:00",
"sliceIdentifierColumnName": "UsageSessionSliceIdentifier"
},
"translator":
{
"type": "TabularTranslator",
"ColumnMappings": "id: usageSessionId, usageInstallationId: usageInstallationId, startTime: startTime, stopTime: stopTime, currentVersion: currentVersion"
}
},
"inputs": [{"name": "Usage Session Input Data"}],
"outputs": [{"name": "Usage Session Output Data"}],
"policy":
{
"timeout": "01:00:00",
"concurrency": 2,
"executionPriorityOrder": "OldestFirst"
},
"scheduler":
{
"frequency": "Hour",
"interval": 1
},
"name": "Usage Session Data To History",
"description": "Copy Usage Session Data To SQL Server DB Activity"
}
],
"start": "2017-05-29T16:15:00Z",
"end": "2500-01-01T00:00:00Z",
"isPaused": false,
"pipelineMode": "Scheduled"
}
}
您可以使用带有 created/modified 日期的查询(它应该存在于您的 table 中)并且只选择当前日期的记录。这将由切片开始或结束日期提供,这样您每天只能读取新创建的记录。
将管道开始日期更改为当前日期。如果管道开始日期是过去的日期,那么会从该日期到当前日期创建许多数据切片,并且它们将被复制。此外,您已设置 Concurrency : 2
。这意味着 2 个活动将 运行 一次。
例如,如果您的输出数据集可用性为 1 天,并且您的管道开始日期为 29 - 05 -2017,那么到今天 16-06-2017,每天将创建总共 18 个数据切片。如果将并发设置为 2,则一次有 2 个复制活动 运行。如果 Concurrency : 10
则 10 个复制活动 运行 并行。
注意输出数据集可用性、管道开始日期、并发性和源查询。
源查询的示例是 $$Text.Format('select * from c where c.ModifiedDate >= \'{0:yyyy-MM-ddTHH:mm:ssZ}\' AND c.ModifiedDate < \'{1:yyyy-MM-ddTHH:mm:ssZ}\'', WindowStart, WindowEnd)
,其中 ModifiedDate 是一个列,它告诉在该特定集合中创建文档的时间。
更新:
{
"name": "DocDbToBlobPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": "SELECT Person.Id, Person.Name.First AS FirstName, Person.Name.Middle as MiddleName, Person.Name.Last AS LastName FROM Person",
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink",
"blobWriterAddHeader": true,
"writeBatchSize": 1000,
"writeBatchTimeout": "00:00:59"
}
},
"inputs": [
{
"name": "PersonDocumentDbTable"
}
],
"outputs": [
{
"name": "PersonBlobTableOut"
}
],
"policy": {
"concurrency": 1
},
"name": "CopyFromDocDbToBlob"
}
],
"start": "2015-04-01T00:00:00Z",
"end": "2015-04-02T00:00:00Z"
}
}
看看Data Factory scheduling and execution
为了你的Reference
我在管道中有一个 Azure 数据工厂副本 Activity。副本 activity 正在运行 - 但数据被复制了多次。我的数据源是 Azure NoSQL DB。如何将复制 Activity 配置为不重新复制记录?
这是我的Activity
{
"name": "Copy Usage Session Data",
"properties":
{
"description": "",
"activities":
[
{
"type": "Copy",
"typeProperties":
{
"source": {"type": "DocumentDbCollectionSource"},
"sink":
{
"type": "SqlSink",
"writeBatchSize": 0,
"writeBatchTimeout": "05:00:00",
"sliceIdentifierColumnName": "InstallationSliceIdentifier"
},
"translator":
{
"type": "TabularTranslator",
"ColumnMappings": "machineKey: machineKey, product: product, softwareVersion: softwareVersion, id: DocumentDBId"
}
},
"inputs": [{"name": "Machine Registration Input Data"}],
"outputs": [{"name": "Machine Registration Output Data"}],
"policy":
{
"timeout": "01:00:00",
"concurrency": 1,
"executionPriorityOrder": "OldestFirst"
},
"scheduler":
{
"frequency": "Hour",
"interval": 1
},
"name": "Machine Registration Data To History",
"description": "Copy Machine Registration Data To SQL Server DB Activity"
},
{
"type": "Copy",
"typeProperties":
{
"source": {"type": "DocumentDbCollectionSource"},
"sink":
{
"type": "SqlSink",
"writeBatchSize": 0,
"writeBatchTimeout": "05:00:00",
"sliceIdentifierColumnName": "UsageSessionSliceIdentifier"
},
"translator":
{
"type": "TabularTranslator",
"ColumnMappings": "id: usageSessionId, usageInstallationId: usageInstallationId, startTime: startTime, stopTime: stopTime, currentVersion: currentVersion"
}
},
"inputs": [{"name": "Usage Session Input Data"}],
"outputs": [{"name": "Usage Session Output Data"}],
"policy":
{
"timeout": "01:00:00",
"concurrency": 2,
"executionPriorityOrder": "OldestFirst"
},
"scheduler":
{
"frequency": "Hour",
"interval": 1
},
"name": "Usage Session Data To History",
"description": "Copy Usage Session Data To SQL Server DB Activity"
}
],
"start": "2017-05-29T16:15:00Z",
"end": "2500-01-01T00:00:00Z",
"isPaused": false,
"pipelineMode": "Scheduled"
}
}
您可以使用带有 created/modified 日期的查询(它应该存在于您的 table 中)并且只选择当前日期的记录。这将由切片开始或结束日期提供,这样您每天只能读取新创建的记录。
将管道开始日期更改为当前日期。如果管道开始日期是过去的日期,那么会从该日期到当前日期创建许多数据切片,并且它们将被复制。此外,您已设置 Concurrency : 2
。这意味着 2 个活动将 运行 一次。
例如,如果您的输出数据集可用性为 1 天,并且您的管道开始日期为 29 - 05 -2017,那么到今天 16-06-2017,每天将创建总共 18 个数据切片。如果将并发设置为 2,则一次有 2 个复制活动 运行。如果 Concurrency : 10
则 10 个复制活动 运行 并行。
注意输出数据集可用性、管道开始日期、并发性和源查询。
源查询的示例是 $$Text.Format('select * from c where c.ModifiedDate >= \'{0:yyyy-MM-ddTHH:mm:ssZ}\' AND c.ModifiedDate < \'{1:yyyy-MM-ddTHH:mm:ssZ}\'', WindowStart, WindowEnd)
,其中 ModifiedDate 是一个列,它告诉在该特定集合中创建文档的时间。
更新:
{
"name": "DocDbToBlobPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "DocumentDbCollectionSource",
"query": "SELECT Person.Id, Person.Name.First AS FirstName, Person.Name.Middle as MiddleName, Person.Name.Last AS LastName FROM Person",
"nestingSeparator": "."
},
"sink": {
"type": "BlobSink",
"blobWriterAddHeader": true,
"writeBatchSize": 1000,
"writeBatchTimeout": "00:00:59"
}
},
"inputs": [
{
"name": "PersonDocumentDbTable"
}
],
"outputs": [
{
"name": "PersonBlobTableOut"
}
],
"policy": {
"concurrency": 1
},
"name": "CopyFromDocDbToBlob"
}
],
"start": "2015-04-01T00:00:00Z",
"end": "2015-04-02T00:00:00Z"
}
}
看看Data Factory scheduling and execution
为了你的Reference