Azure 数据工厂 - 从 Blob 批量导入到 Azure SQL
Azure Data Factory - Bulk Import from Blob to Azure SQL
我有一个简单的文件 FD_GROUP.TXT,内容为:
~0100~^~奶制品和蛋制品~
~0200~^~香料和香草~
~0300~^~婴儿食品~
~0400~^~油脂~
~0500~^~家禽产品~
我正在尝试使用 Azure 数据工厂将这些文件(有些文件有 700,000 行)批量导入到 SQL 数据库。
策略是先用^分隔列,然后我用空字符替换波浪号(~),所以我失去波浪号(~),然后插入。
1. SQL解决方案:
DECLARE @CsvFilePath NVARCHAR(1000) = 'D:\CodePurehope\Dev\NutrientData\FD_GROUP.txt';
CREATE TABLE #TempTable
(
[FoodGroupCode] VARCHAR(666) NOT NULL,
[FoodGroupDescription] VARCHAR(60) NOT NULL
)
DECLARE @sql NVARCHAR(4000) = 'BULK INSERT #TempTable FROM ''' + @CsvFilePath + ''' WITH ( FIELDTERMINATOR =''^'', ROWTERMINATOR =''\n'' )';
EXEC(@sql);
UPDATE #TempTable
SET [FoodGroupCode] = REPLACE([FoodGroupCode], '~', ''),
[FoodGroupDescription] = REPLACE([FoodGroupDescription], '~', '')
GO
INSERT INTO [dbo].[FoodGroupDescriptions]
(
[FoodGroupCode],
[FoodGroupDescription]
)
SELECT
[FoodGroupCode],
[FoodGroupDescription]
FROM
#TempTable
GO
DROP TABLE #TempTable
2。 SSIS ETL包解决方案:
用 ^ 分隔的平面文件源和派生列转换以替换不必要的波浪号 (~),如上图所示。
你是如何使用 Microsoft Azure 数据工厂的?
我已将 FD_GROUP.TXT 作为 input 上传到 Azure 存储 Blob,并在 Azure SQL 服务器上准备好 table 输出.
我有:
- 2 个链接服务:AzureStorage 和 AzureSQL.
- 2 个数据集:Blob 作为输入,SQL 作为输出
- 1 条管道
FoodGroupDescriptionsAzureBlob 设置
{
"name": "FoodGroupDescriptionsAzureBlob",
"properties": {
"structure": [
{
"name": "FoodGroupCode",
"type": "Int32"
},
{
"name": "FoodGroupDescription",
"type": "String"
}
],
"published": false,
"type": "AzureBlob",
"linkedServiceName": "AzureStorageLinkedService",
"typeProperties": {
"fileName": "FD_GROUP.txt",
"folderPath": "nutrition-data/NutrientData/",
"format": {
"type": "TextFormat",
"rowDelimiter": "\n",
"columnDelimiter": "^"
}
},
"availability": {
"frequency": "Minute",
"interval": 15
}
}
}
FoodGroupDescriptionsSQLAzure 设置
{
"name": "FoodGroupDescriptionsSQLAzure",
"properties": {
"structure": [
{
"name": "FoodGroupCode",
"type": "Int32"
},
{
"name": "FoodGroupDescription",
"type": "String"
}
],
"published": false,
"type": "AzureSqlTable",
"linkedServiceName": "AzureSqlLinkedService",
"typeProperties": {
"tableName": "FoodGroupDescriptions"
},
"availability": {
"frequency": "Minute",
"interval": 15
}
}
}
FoodGroupDescriptionsPipeline 设置
{
"name": "FoodGroupDescriptionsPipeline",
"properties": {
"description": "Copy data from a blob to Azure SQL table",
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000,
"writeBatchTimeout": "60.00:00:00"
}
},
"inputs": [
{
"name": "FoodGroupDescriptionsAzureBlob"
}
],
"outputs": [
{
"name": "FoodGroupDescriptionsSQLAzure"
}
],
"policy": {
"timeout": "01:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst"
},
"scheduler": {
"frequency": "Minute",
"interval": 15
},
"name": "CopyFromBlobToSQL",
"description": "Bulk Import FoodGroupDescriptions"
}
],
"start": "2015-07-13T00:00:00Z",
"end": "2015-07-14T00:00:00Z",
"isPaused": false,
"hubName": "gymappdatafactory_hub",
"pipelineMode": "Scheduled"
}
}
这东西在 Azure 数据工厂上不起作用 + 我不知道如何在这种情况下使用替换。任何帮助表示赞赏。
我正在使用您的代码,并且能够通过执行以下操作使其正常工作:
在您的 FoodGroupDescriptionsAzureBlob json 定义中,您需要在属性节点中添加 "external": true。 Blob 输入文件是从外部源创建的,而不是从 Azure 数据工厂管道创建的,通过将其设置为 true,它可以让 Azure 数据工厂知道该输入应该可以使用了。
还在 blob 输入定义中添加:
"quoteChar": "~" 到 "format" 节点,因为看起来数据被包裹在 "~" 这将从数据中剥离那些你定义的 INT 将正确插入到你的 sql table.
完整的 blob def:
{
"name": "FoodGroupDescriptionsAzureBlob",
"properties": {
"structure": [
{
"name": "FoodGroupCode",
"type": "Int32"
},
{
"name": "FoodGroupDescription",
"type": "String"
}
],
"published": false,
"type": "AzureBlob",
"linkedServiceName": "AzureStorageLinkedService",
"typeProperties": {
"fileName": "FD_Group.txt",
"folderPath": "nutrition-data/NutrientData/",
"format": {
"type": "TextFormat",
"rowDelimiter": "\n",
"columnDelimiter": "^",
"quoteChar": "~"
}
},
"availability": {
"frequency": "Minute",
"interval": 15
},
"external": true,
"policy": {}
}
}
由于您将间隔设置为每 15 分钟一次,并且管道的开始和结束日期为一整天,因此这将为整个管道 运行 持续时间每 15 分钟创建一个切片,因为您只想运行 这一次将开始和结束更改为:
"start": "2015-07-13T00:00:00Z",
"end": "2015-07-13T00:15:00Z",
这将创建 1 个切片。
希望对您有所帮助。
我有一个简单的文件 FD_GROUP.TXT,内容为:
~0100~^~奶制品和蛋制品~
~0200~^~香料和香草~
~0300~^~婴儿食品~
~0400~^~油脂~
~0500~^~家禽产品~
我正在尝试使用 Azure 数据工厂将这些文件(有些文件有 700,000 行)批量导入到 SQL 数据库。
策略是先用^分隔列,然后我用空字符替换波浪号(~),所以我失去波浪号(~),然后插入。
1. SQL解决方案:
DECLARE @CsvFilePath NVARCHAR(1000) = 'D:\CodePurehope\Dev\NutrientData\FD_GROUP.txt';
CREATE TABLE #TempTable
(
[FoodGroupCode] VARCHAR(666) NOT NULL,
[FoodGroupDescription] VARCHAR(60) NOT NULL
)
DECLARE @sql NVARCHAR(4000) = 'BULK INSERT #TempTable FROM ''' + @CsvFilePath + ''' WITH ( FIELDTERMINATOR =''^'', ROWTERMINATOR =''\n'' )';
EXEC(@sql);
UPDATE #TempTable
SET [FoodGroupCode] = REPLACE([FoodGroupCode], '~', ''),
[FoodGroupDescription] = REPLACE([FoodGroupDescription], '~', '')
GO
INSERT INTO [dbo].[FoodGroupDescriptions]
(
[FoodGroupCode],
[FoodGroupDescription]
)
SELECT
[FoodGroupCode],
[FoodGroupDescription]
FROM
#TempTable
GO
DROP TABLE #TempTable
2。 SSIS ETL包解决方案:
用 ^ 分隔的平面文件源和派生列转换以替换不必要的波浪号 (~),如上图所示。
你是如何使用 Microsoft Azure 数据工厂的?
我已将 FD_GROUP.TXT 作为 input 上传到 Azure 存储 Blob,并在 Azure SQL 服务器上准备好 table 输出.
我有:
- 2 个链接服务:AzureStorage 和 AzureSQL.
- 2 个数据集:Blob 作为输入,SQL 作为输出
- 1 条管道
FoodGroupDescriptionsAzureBlob 设置
{
"name": "FoodGroupDescriptionsAzureBlob",
"properties": {
"structure": [
{
"name": "FoodGroupCode",
"type": "Int32"
},
{
"name": "FoodGroupDescription",
"type": "String"
}
],
"published": false,
"type": "AzureBlob",
"linkedServiceName": "AzureStorageLinkedService",
"typeProperties": {
"fileName": "FD_GROUP.txt",
"folderPath": "nutrition-data/NutrientData/",
"format": {
"type": "TextFormat",
"rowDelimiter": "\n",
"columnDelimiter": "^"
}
},
"availability": {
"frequency": "Minute",
"interval": 15
}
}
}
FoodGroupDescriptionsSQLAzure 设置
{
"name": "FoodGroupDescriptionsSQLAzure",
"properties": {
"structure": [
{
"name": "FoodGroupCode",
"type": "Int32"
},
{
"name": "FoodGroupDescription",
"type": "String"
}
],
"published": false,
"type": "AzureSqlTable",
"linkedServiceName": "AzureSqlLinkedService",
"typeProperties": {
"tableName": "FoodGroupDescriptions"
},
"availability": {
"frequency": "Minute",
"interval": 15
}
}
}
FoodGroupDescriptionsPipeline 设置
{
"name": "FoodGroupDescriptionsPipeline",
"properties": {
"description": "Copy data from a blob to Azure SQL table",
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000,
"writeBatchTimeout": "60.00:00:00"
}
},
"inputs": [
{
"name": "FoodGroupDescriptionsAzureBlob"
}
],
"outputs": [
{
"name": "FoodGroupDescriptionsSQLAzure"
}
],
"policy": {
"timeout": "01:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst"
},
"scheduler": {
"frequency": "Minute",
"interval": 15
},
"name": "CopyFromBlobToSQL",
"description": "Bulk Import FoodGroupDescriptions"
}
],
"start": "2015-07-13T00:00:00Z",
"end": "2015-07-14T00:00:00Z",
"isPaused": false,
"hubName": "gymappdatafactory_hub",
"pipelineMode": "Scheduled"
}
}
这东西在 Azure 数据工厂上不起作用 + 我不知道如何在这种情况下使用替换。任何帮助表示赞赏。
我正在使用您的代码,并且能够通过执行以下操作使其正常工作:
在您的 FoodGroupDescriptionsAzureBlob json 定义中,您需要在属性节点中添加 "external": true。 Blob 输入文件是从外部源创建的,而不是从 Azure 数据工厂管道创建的,通过将其设置为 true,它可以让 Azure 数据工厂知道该输入应该可以使用了。
还在 blob 输入定义中添加: "quoteChar": "~" 到 "format" 节点,因为看起来数据被包裹在 "~" 这将从数据中剥离那些你定义的 INT 将正确插入到你的 sql table.
完整的 blob def:
{
"name": "FoodGroupDescriptionsAzureBlob",
"properties": {
"structure": [
{
"name": "FoodGroupCode",
"type": "Int32"
},
{
"name": "FoodGroupDescription",
"type": "String"
}
],
"published": false,
"type": "AzureBlob",
"linkedServiceName": "AzureStorageLinkedService",
"typeProperties": {
"fileName": "FD_Group.txt",
"folderPath": "nutrition-data/NutrientData/",
"format": {
"type": "TextFormat",
"rowDelimiter": "\n",
"columnDelimiter": "^",
"quoteChar": "~"
}
},
"availability": {
"frequency": "Minute",
"interval": 15
},
"external": true,
"policy": {}
}
}
由于您将间隔设置为每 15 分钟一次,并且管道的开始和结束日期为一整天,因此这将为整个管道 运行 持续时间每 15 分钟创建一个切片,因为您只想运行 这一次将开始和结束更改为:
"start": "2015-07-13T00:00:00Z",
"end": "2015-07-13T00:15:00Z",
这将创建 1 个切片。
希望对您有所帮助。