通过 apache 气流检查 google 存储桶中是否存在文件?
Checking if file exists in google bucket via apache airflow?
我有一个 DAG,它获取 Google 云存储桶中脚本的结果,将其加载到 Google BigQuery 中的 table,然后删除存储桶中的文件.
我希望 DAG 在周末每小时检查一次。现在,我正在使用 GoogleCloudStoragetoBigQueryOperator。如果该文件不存在,DAG 将失败。有没有一种方法可以将 DAG 设置为在文件不存在时不会失败的位置?也许 try/catch?
您可以使用 Google 提供程序包中的 GCSObjectExistenceSensor 来验证文件是否在 运行 下游任务之前存在。
gcs_object_exists = GCSObjectExistenceSensor(
bucket=BUCKET_1,
object=PATH_TO_UPLOAD_FILE,
mode='poke',
task_id="gcs_object_exists_task",
)
可以查看官方示例here。请记住,此传感器从 BaseSensorOperator
扩展而来,因此您可以根据需要定义 poke_interval
、timeout
和 mode
等参数。
- soft_fail (bool) – Set to true to mark the task as SKIPPED on failure
- poke_interval (float) – Time in seconds that the job should wait in between each tries
- timeout (float) – Time, in seconds before the task times out and fails.
- mode (str) – How the sensor operates. Options are: { poke | reschedule }, default is poke. When set to poke the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. Use this mode if the expected runtime of the sensor is short or if a short poke interval is required. Note that the sensor will hold onto a worker slot and a pool slot for the duration of the sensor’s runtime in this mode. When set to reschedule the sensor task frees the worker slot when the criteria is not yet met and it’s rescheduled at a later time. Use this mode if the time before the criteria is met is expected to be quite long. The poke interval should be more than one minute to prevent too much load on the scheduler.
- exponential_backoff (bool) – allow progressive longer waits between pokes by using exponential backoff algorithm
我有一个 DAG,它获取 Google 云存储桶中脚本的结果,将其加载到 Google BigQuery 中的 table,然后删除存储桶中的文件.
我希望 DAG 在周末每小时检查一次。现在,我正在使用 GoogleCloudStoragetoBigQueryOperator。如果该文件不存在,DAG 将失败。有没有一种方法可以将 DAG 设置为在文件不存在时不会失败的位置?也许 try/catch?
您可以使用 Google 提供程序包中的 GCSObjectExistenceSensor 来验证文件是否在 运行 下游任务之前存在。
gcs_object_exists = GCSObjectExistenceSensor(
bucket=BUCKET_1,
object=PATH_TO_UPLOAD_FILE,
mode='poke',
task_id="gcs_object_exists_task",
)
可以查看官方示例here。请记住,此传感器从 BaseSensorOperator
扩展而来,因此您可以根据需要定义 poke_interval
、timeout
和 mode
等参数。
- soft_fail (bool) – Set to true to mark the task as SKIPPED on failure
- poke_interval (float) – Time in seconds that the job should wait in between each tries
- timeout (float) – Time, in seconds before the task times out and fails.
- mode (str) – How the sensor operates. Options are: { poke | reschedule }, default is poke. When set to poke the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. Use this mode if the expected runtime of the sensor is short or if a short poke interval is required. Note that the sensor will hold onto a worker slot and a pool slot for the duration of the sensor’s runtime in this mode. When set to reschedule the sensor task frees the worker slot when the criteria is not yet met and it’s rescheduled at a later time. Use this mode if the time before the criteria is met is expected to be quite long. The poke interval should be more than one minute to prevent too much load on the scheduler.
- exponential_backoff (bool) – allow progressive longer waits between pokes by using exponential backoff algorithm