使用 ADF REST 连接器读取和转换 FHIR 数据
Using ADF REST connector to read and transform FHIR data
我正在尝试使用 Azure 数据工厂从 FHIR 服务器读取数据,并将结果转换为 Azure Blob 存储中以换行符分隔的 JSON (ndjson) 文件。具体来说,如果您查询 FHIR 服务器,您可能会得到如下内容:
{
"resourceType": "Bundle",
"id": "som-id",
"type": "searchset",
"link": [
{
"relation": "next",
"url": "https://fhirserver/?ct=token"
},
{
"relation": "self",
"url": "https://fhirserver/"
}
],
"entry": [
{
"fullUrl": "https://fhirserver/Organization/1234",
"resource": {
"resourceType": "Organization",
"id": "1234",
// More fields
},
{
"fullUrl": "https://fhirserver/Organization/456",
"resource": {
"resourceType": "Organization",
"id": "456",
// More fields
},
// More resources
]
}
基本上是一堆资源。我想将其转换为换行符分隔(又名 ndjson)文件,其中每一行只是资源的 json:
{"resourceType": "Organization", "id": "1234", // More fields }
{"resourceType": "Organization", "id": "456", // More fields }
// More lines with resources
我能够设置 REST 连接器,它可以查询 FHIR 服务器(包括分页),但无论我尝试什么,我似乎都无法生成我想要的输出。我设置了一个 Azure Blob 存储数据集:
{
"name": "AzureBlob1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "setOfObjects"
},
"fileName": "myout.json",
"folderPath": "outfhirfromadf"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
并配置一份activity:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"schemaMapping": {
"resource": "resource"
},
"collectionReference": "$.entry"
}
},
"inputs": [
{
"referenceName": "FHIRSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
]
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
但最后(尽管配置了架构映射),blob 中的最终结果始终只是从服务器返回的原始包。如果我将输出 blob 配置为逗号分隔的文本,我可以提取字段并创建一个扁平化的表格视图,但这并不是我真正想要的。
如有任何建议,我们将不胜感激。
正如评论中简要讨论的那样,Copy Activity
除了映射数据外没有提供太多功能。如文档中所述,Copy activity 执行以下操作:
- Reads data from a source data store.
- Performs serialization/deserialization, compression/decompression, column mapping, etc. It does these operations based on the
configurations of the input dataset, output dataset, and Copy
Activity.
- Writes data to the sink/destination data store.
看起来 Copy Activity
除了有效地复制东西之外没有做任何其他事情。
我发现有用的是使用 Databrick。
步骤如下:
- 将 Databricks 帐户添加到您的订阅;
- 单击创作按钮转到 Databricks 页面;
- 创建笔记本;
- 编写脚本(Scala,Python 或 .Net was recently announced)。
脚本如下:
- 从Blob存储中读取数据;
- 根据需要过滤和转换数据;
- 将数据写回 Blob 存储;
您可以从那里测试您的脚本,一旦准备就绪,您可以返回您的管道并创建一个 Notebook activity 指向包含该脚本的笔记本。
我在 Scala 中苦苦挣扎,但这是值得的:)
所以我找到了解决办法。如果我执行最初的转换步骤,将捆绑包简单地转储到 JSON 文件中,然后再从 JSON 文件转换为我假装为文本文件的另一个 blob,我可以获得创建的njson文件。
基本上,定义另一个 blob 数据集:
{
"name": "AzureBlob2",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"structure": [
{
"name": "Prop_0",
"type": "String"
}
],
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ",",
"rowDelimiter": "",
"quoteChar": "",
"nullValue": "\N",
"encodingName": null,
"treatEmptyAsNull": true,
"skipLineCount": 0,
"firstRowAsHeader": false
},
"fileName": "myout.json",
"folderPath": "adfjsonout2"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
注意这个 TextFormat
并且还要注意 quoteChar
是空白的。如果我再添加另一个 Copy Activity:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"schemaMapping": {
"['resource']": "resource"
},
"collectionReference": "$.entry"
}
},
"inputs": [
{
"referenceName": "FHIRSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
]
},
{
"name": "Copy Data2",
"type": "Copy",
"dependsOn": [
{
"activity": "Copy Data1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"columnMappings": {
"resource": "Prop_0"
}
}
},
"inputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob2",
"type": "DatasetReference"
}
]
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
然后一切都解决了。这并不理想,因为我现在在 blob 中有两个数据副本,但我想可以很容易地删除一个。
如果有人有一步解决方案,我仍然很想听听。
对于将来发现此 post 的任何人,您可以使用 $export api 调用来完成此操作。请注意,您必须有一个链接到您的 Fhir 服务器的存储帐户。
https://build.fhir.org/ig/HL7/bulk-data/export.html#endpoint---system-level-export
我正在尝试使用 Azure 数据工厂从 FHIR 服务器读取数据,并将结果转换为 Azure Blob 存储中以换行符分隔的 JSON (ndjson) 文件。具体来说,如果您查询 FHIR 服务器,您可能会得到如下内容:
{
"resourceType": "Bundle",
"id": "som-id",
"type": "searchset",
"link": [
{
"relation": "next",
"url": "https://fhirserver/?ct=token"
},
{
"relation": "self",
"url": "https://fhirserver/"
}
],
"entry": [
{
"fullUrl": "https://fhirserver/Organization/1234",
"resource": {
"resourceType": "Organization",
"id": "1234",
// More fields
},
{
"fullUrl": "https://fhirserver/Organization/456",
"resource": {
"resourceType": "Organization",
"id": "456",
// More fields
},
// More resources
]
}
基本上是一堆资源。我想将其转换为换行符分隔(又名 ndjson)文件,其中每一行只是资源的 json:
{"resourceType": "Organization", "id": "1234", // More fields }
{"resourceType": "Organization", "id": "456", // More fields }
// More lines with resources
我能够设置 REST 连接器,它可以查询 FHIR 服务器(包括分页),但无论我尝试什么,我似乎都无法生成我想要的输出。我设置了一个 Azure Blob 存储数据集:
{
"name": "AzureBlob1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "setOfObjects"
},
"fileName": "myout.json",
"folderPath": "outfhirfromadf"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
并配置一份activity:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"schemaMapping": {
"resource": "resource"
},
"collectionReference": "$.entry"
}
},
"inputs": [
{
"referenceName": "FHIRSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
]
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
但最后(尽管配置了架构映射),blob 中的最终结果始终只是从服务器返回的原始包。如果我将输出 blob 配置为逗号分隔的文本,我可以提取字段并创建一个扁平化的表格视图,但这并不是我真正想要的。
如有任何建议,我们将不胜感激。
正如评论中简要讨论的那样,Copy Activity
除了映射数据外没有提供太多功能。如文档中所述,Copy activity 执行以下操作:
- Reads data from a source data store.
- Performs serialization/deserialization, compression/decompression, column mapping, etc. It does these operations based on the configurations of the input dataset, output dataset, and Copy Activity.
- Writes data to the sink/destination data store.
看起来 Copy Activity
除了有效地复制东西之外没有做任何其他事情。
我发现有用的是使用 Databrick。
步骤如下:
- 将 Databricks 帐户添加到您的订阅;
- 单击创作按钮转到 Databricks 页面;
- 创建笔记本;
- 编写脚本(Scala,Python 或 .Net was recently announced)。
脚本如下:
- 从Blob存储中读取数据;
- 根据需要过滤和转换数据;
- 将数据写回 Blob 存储;
您可以从那里测试您的脚本,一旦准备就绪,您可以返回您的管道并创建一个 Notebook activity 指向包含该脚本的笔记本。
我在 Scala 中苦苦挣扎,但这是值得的:)
所以我找到了解决办法。如果我执行最初的转换步骤,将捆绑包简单地转储到 JSON 文件中,然后再从 JSON 文件转换为我假装为文本文件的另一个 blob,我可以获得创建的njson文件。
基本上,定义另一个 blob 数据集:
{
"name": "AzureBlob2",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"structure": [
{
"name": "Prop_0",
"type": "String"
}
],
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ",",
"rowDelimiter": "",
"quoteChar": "",
"nullValue": "\N",
"encodingName": null,
"treatEmptyAsNull": true,
"skipLineCount": 0,
"firstRowAsHeader": false
},
"fileName": "myout.json",
"folderPath": "adfjsonout2"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
注意这个 TextFormat
并且还要注意 quoteChar
是空白的。如果我再添加另一个 Copy Activity:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"schemaMapping": {
"['resource']": "resource"
},
"collectionReference": "$.entry"
}
},
"inputs": [
{
"referenceName": "FHIRSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
]
},
{
"name": "Copy Data2",
"type": "Copy",
"dependsOn": [
{
"activity": "Copy Data1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"columnMappings": {
"resource": "Prop_0"
}
}
},
"inputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob2",
"type": "DatasetReference"
}
]
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
然后一切都解决了。这并不理想,因为我现在在 blob 中有两个数据副本,但我想可以很容易地删除一个。
如果有人有一步解决方案,我仍然很想听听。
对于将来发现此 post 的任何人,您可以使用 $export api 调用来完成此操作。请注意,您必须有一个链接到您的 Fhir 服务器的存储帐户。
https://build.fhir.org/ig/HL7/bulk-data/export.html#endpoint---system-level-export