Azure Synapse Pipeline 中的动态 REST 调用

Dynamic REST calls in Azure Synapse Pipeline

我正在使用 Azure Synapse 调用 REST API,return 数据集如下所示:

{
"links": [
    {
        "rel": "next",
        "href": "[myRESTendpoint]?limit=1000&offset=1000"
    },
    {
        "rel": "last",
        "href": "[myRESTendpoint]?limit=1000&offset=60000"
    },
    {
        "rel": "self",
        "href": "[myRESTendpoint]"
    }
],
"count": 1000,
"hasMore": true,
"items": [
    {
        "links": [],
        "closedate": "6/16/2014",
        "id": "16917",
        "number": "62000",
        "status": "H",
        "tranid": "0062000"
    },...
],
"offset": 0,
"totalResults": 60316
}

我熟悉对单个端点进行 REST 调用,该端点可以 return 使用 Synapse 管道通过一次调用获取所有数据,但是这个特定的 REST 端点仅对 returning 1000 条记录,但它确实给出了一个名为“hasMore”的 属性。

有没有一种方法可以递归地在 Synapse 管道中进行休息调用,直到“hasMore”属性 等于 false?

最终目标是将数据汇入专用 SQL 池或 ADLS2 并从那里转换。

我尝试使用 Azure 数据工厂来实现相同的场景,这似乎更合适也更容易实现目标“最终目标是将数据接收到专用的 SQL 池或进入 ADLS2 并从那里转换。

由于您必须递归地访问页面以获取 1000 条记录,如果响应 header/response 正文包含下一页的 URL,您可以按以下方式设置它。

如果下一页 link 或查询参数未包含在响应中,您将不太可能使用该功能 headers/body。

或者,您可以利用循环逻辑并执行复制 Activity。

在 Rest 连接器中创建两个参数:

填写 RestConnector 的相关参数 URL。

使用设置变量操作,这个变量的值将在循环中增加。对于每个循环,CopyActivity的URL是动态set.If你想循环或者迭代,你可以使用Untilactivity.

选择:

根据我的经验,REST 连接分页非常严格。通常将动作放在一个循环中。结果,有更多的控制权。 FOREACH 循环,here

对于关注该主题的人,我采纳了 IpsitaDash-MT 的建议,使用 ForEach 循环。在这个 API 的情况下,当进行调用时,我在名为“totalResults”的调用结束时得到一个 属性 returned。以下是我用来实现我想要做的事情的步骤:

  1. 对 API 进行虚拟调用以获取“totalResults”参数。这只是对 return 我希望获得的结果数量的调用。在这个 API 的情况下,请求的主体是一个 SQL 语句,所以当发出虚拟请求时,我只询问我想要获得的结果的 ID。

SQL statement example

  1. 然后我从该请求中获取 属性“totalResults”,在 ForEach 循环的“Items”中设置一个动态值,如下所示:

    @range(0,add(div(sub(int(activity('Get Pages Customers').output.totalResults),mod(int( activity('Get Pages Customers').output.totalResults),1000)),1000),1))

注意: API 只允许 1000 个结果的页面,我做了一些数学运算以获得页码范围。我还必须在最终结果中加 1 以包括最后一页。

ForEach Loop Settings

  1. 在API中我有两个可以传递的参数“limit”和“offset”。因为我想要所有数据,所以没有理由将限制设置为 1000(最大允许数量)以外的任何值。 offset参数可以设置为小于等于“totalResults”-“limit”且大于等于0的任意数。所以我使用步骤2中建立的范围乘以1000来设置offset参数在URL.

Setting the offset parameter in the copy data activity

Dynamic value of the Relative URL in the REST connector

注意: 我发现最好先将数据作为 JSON 放入 ADLS2 而不是放入专用的 SQL 池,因为查找功能.

  1. 由于 synapse 不允许嵌套 ForEach 循环,我 运行 通过数据流来格式化数据并检查重复和更新。

  2. 当数据流完成后,它开始查找 activity 以获取刚刚处理的数据并将其传递到新管道以使用另一个 ForEach 循环来获取 child 数据为 parent 数据的每个 ID。

Data Flow and Lookup for child data pipeline