Google 从 SFTP 服务器获取无服务器日志(文件下载)的云平台解决方案

Google Cloud Platform solution for serverless log ingestion (files downloading) from a SFTP server

今天有个问题,打答案的时候被删了(不知道为什么)。由于答案很长,我决定 到 copy/recreate 它并提供我的答案 无论如何。也许对某些人有用。


原题如下:

我们有一个 SFTP 服务器,其中转储了 Apache、Nginx、Wordpress 日志。我们想将这些日志备份到 Cloud Storage 中,同时解析这些日志的内容并插入到 BigQuery 表中。我通常使用 Cloud Functions(NodeJS 或 Python),我首先想到的是首选解决方案。

但是,Cloud Function 有一个触发器,如果​​我的 objective 是一个程序不断地 watch/observe/listen 在 SFTP 文件夹上获取新文件,那么这就没有意义了。如果我的要求不那么严格,我可以按计划触发它,比如每小时读取一次 SFTP 文件夹中的新文件。当新文件被转储到 Cloud Storage 时,Cloud Functions 也会起作用,触发函数解析日志文件并插入到 BigQuery。

如果我坚持不断侦听 SFTP 文件夹的要求,您能否建议一个更好的设计解决方案以及我需要组合哪些 GCP 服务(除了 Cloud Storage 和 BigQuery)才能实现这一目标?

如果要求不那么严格,我的方案够好吗? P.S。我刚刚发现 SFTP 凭据具有只读权限。所以重命名通过添加后缀完成的文件是没有问题的。我应该使用像 MemoryStore 这样的缓存来记住哪些文件已完成吗?

久读。

在我看来,这是一个非常大的问题。解决方案不仅需要代码开发,还需要大量的设计思维和决策(包括一些妥协)。

根据我的个人经验(我开发了两次这样的解决方案,在生产中对其进行了维护等)可以将云功能与一组 GCP 资源一起使用 - 秘密管理器、pubsub 主题、firestore 集合、服务accounts 和 IAM for them 等等......根据你的要求(我不知道细节)和上下文 - 你可能需要创建一个功能组件,其中包含几个(假设在两个和五个之间)不同的云功能。二-如果你的文件很小(每个100M以内),每天的文件数量不大(几千或几万个文件),你有权在下载后从SFTP服务器上删除原始文件。

如果您没有此类权限 - 应该有一些其他进程可以清理 'old' 或 'already downloaded' 文件。否则,最终解决方案将无法工作(当仅下载文件列表,而不是文件,而仅下载文件列表时,需要超过 540 秒)。

SFTP 是一个 'passive' 组件 - 如果有新文件到达,它不会通知我们,因此我们这边应该有一些 'active' 组件来发起与 SFTP 服务器的连接。这是一个 'pull' 交互,并且有规律性 - 即每 10、15 或 20 分钟 - 连接到 SFTP 服务器并检查是否有任何新内容要下载。

接下来。云函数是幂等的,无法 store/keep 仅在云函数内下载文件的状态。应该有一些外部(相对于云功能)服务来维护每个文件下载过程的状态机。我使用了 Firestore。它非常方便并且延迟非常小。 firestore 集合中的每个文档都代表 'process of a file downloading' 的反映 - 状态机以及大量元数据、状态转换历史等。

Cloud Functions 有 2 个重要限制:

  1. 540 秒超时。
  2. 2Gb 内存。

这意味着下载过程(以及任何其他活动)不应超过 540 秒。如果您要将任何数据存储在(云函数的)内存中,则该数据块应小于 2Gb。

超时限制会影响进程吗? - 是的,它可以。整个过程的瓶颈是SFTP服务器和GCP存在点之间的“带宽”。文件越大 - 下载它所需的时间越长,尤其是在并行下载许多文件时。

所以,算法很快就会按以下方式运行:

1/ 第一个云函数每隔 15 分钟触发一次(Cloud Scheduler => PubSub 主题 => 云函数)。云函数读取所有 SFTP 连接和所有数据管道的配置(即来自 GCS 存储桶的 json 文件)(因为此组件可能与许多 SFTP 服务器一起使用,并且对于每个 SFTP 服务器,可能有许多数据管道),然后从 Secret Manager 获取凭据(针对每个 SFTP 服务器),然后连接到 SFTP 服务器,并下载每个 connection/pipeline 的可用文件列表。因此对于我们知道的每个文件——连接(SFTP 服务器)、管道(即源目录)、文件名、文件大小、文件修改时间戳。我对 SFTP 服务器不再有任何期望。对于每个连接和数据管道,我们组成一个文件列表(取决于配置并且应该是灵活的)最多 5、8 或 1 万个文件。该列表作为 json 结构被作为消息推送到 PubSub 主题(如果需要,还有一些额外的元数据)。因此,如果我们有 2 个 SFTP 服务器,每个服务器中有 3 个管道 - 将至少有 6 条消息。如果 SFTP 服务器中的目录包含超过 5K、8K 或 10K 的文件,则可能更多。目前,我们不知道这些文件是否已下载,或者正在下载过程中,或者下载失败,或者这是一个新文件。 部署此函数时 - “max instances”参数的值为 1。

2/ 第二个云功能由包含文件列表的 PubSub 消息触发(对于某些 SFTP 服务器和某些管道)。对于传入列表中的每个文件,云函数应该决定要做什么:

  1. 这是一个新文件,应该下载它。
  2. 这是一个正在进行的下载,我们需要等待更多 - 什么都不做。
  3. 这是一个已经下载的文件,我们什么都不做。
  4. 正在下载,但下载时间太长 - 可能是下载崩溃了,应该重新下载。
  5. 这是...可能还有更多案件需要处理...

现在需要 firestore 集合。集合中的每个文档 - 反映文件发生的情况;一切都记录在那里 - 下载过程何时开始,何时(或是否)完成等等。文档 ID 是根据可用元数据计算的哈希值 - 连接(SFTP 服务器)、管道(即源目录)、源文件名、源文件大小、源文件修改时间戳。所有这些都来自消息。

例如,我们计算哈希并检查集合中是否存在此类文档。如果它不存在 - 创建一个新文档,因为这是一个全新的下载文件。然后撰写 json 消息并将其推送到第二个 PubSub 主题 - 下一个云功能将对其进行处理。它存在 - 有必要决定我们将如何处理它 - 什么都不做(因为它已经下载,或者因为下载可能仍在进行中)或再次触发它的下载 - 组成一个 json消息并将其推送到第二个 PubSub 主题...

部署此函数时 - “最大实例数”参数的值在 4 到 12 之间(根据我的经验)。

3/ 第三个云功能由包含下载文件详细信息的 PubSub 消息触发。需要完成以下步骤:

  1. 检查此文件是否未被其他云功能下载
  2. 更新 firestore 文档 - 我们开始下载过程
  3. 获取配置详细信息(来自 GCS 中的 json 文件)
  4. 获取连接详细信息(来自 Secret Manager)
  5. 连接并下载
  6. 将下载的文件保存到目标 GCS 存储桶中
  7. 更新 firestore 文档 - 我们完成了下载过程

部署此函数时 - “最大实例”参数的值在 10 到 30 之间(根据我的经验)。

这是一个非常简短的描述,基于最简单的假设(即您没有大于 100Mb 的文件 or/and 连接良好)。

一些额外的注意事项。

1/ 准确的记录。 Json 具有一致字段的结构将被定期记录。我建议制作一个接收器,以便可以在 BigQuery 表中分析日志。

2/ 服务帐户和 IAM。所有这些都应该 运行 在仅用于给定组件的自定义服务帐户下。将提供相关的 IAM 角色。

3/ 云 NAT。 SFTP(以我的经验)仅适用于特定的静态 IP 地址(它们不允许来自任何地址的连接)。因此,网络、子网、IP 地址、路由器、NAT——所有这些都需要创建和配置。 IP 地址将提供给 SFTP 服务器所有者,以允许访问。将使用“vpc 连接器”参数部署的云功能。

4/ 进度和监控 - 3 个信息来源 - firestore 收集、Stackdriver 日志、BigQuery 表。

同样,这是我脑海中的一个非常简化的描述。如果您有具体问题或想讨论,请告诉我。