GCP Cloud Tasks:缩短创建先前创建的命名任务的周期

GCP Cloud Tasks: shorten period for creating a previously created named task

我们正在开发一个基于 GCP Cloud Task 的队列进程,该进程会在特定 Firestore 文档写入触发器触发时发送状态电子邮件。我们使用 Cloud Tasks 的原因是可以在发送电子邮件之前创建延迟(在未来使用 scheduledTime 属性 2 分钟),并控制重复数据删除(通过使用格式如下的任务名称:[firestore -collection-name]-[doc-id]) 因为 Firestore 文档上的 'write' 触发器可以在创建文档时触发多次,然后由后端云功能快速更新。

一旦达到任务的延迟期,云任务就会运行,并发送包含更新的 Firestore 文档信息的电子邮件。之后任务从队列中删除,一切正常。

除了:

如果用户更新 Firestore 文档(比如 20 或 30 分钟后),我们想重新发送状态电子邮件,但无法使用相同的任务名称创建任务。我们收到以下错误:

409 The task cannot be created because a task with this name existed too recently. For more information about task de-duplication see https://cloud.google.com/tasks/docs/reference/rest/v2/projects.locations.queues.tasks/create#body.request_body.FIELDS.task.

这出乎意料,因为最后一个任务已成功完成,此时队列为空。错误消息中引用的文档说:

If the task's queue was created using Cloud Tasks, then another task with the same name can't be created for ~1hour after the original task was deleted or executed.

问题:有没有什么方法可以通过减少时间量,甚至完全取消限制来绕过这个限制?

简短的回答是否定的。正如您已经指出的,文档对这种行为非常清楚,您应该等待 1 小时才能创建一个与之前创建的任务同名的任务。 API 或客户端库不允许减少此时间。

话虽如此,我建议不要使用相同的任务 ID,而是为任务使用不同的任务 ID,并在请求正文中添加标识符。例如,使用 Python:

from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
import datetime

def create_task(project, queue, location, payload=None, in_seconds=None):
    client = tasks_v2.CloudTasksClient()

    parent = client.queue_path(project, location, queue)

    task = {
            'app_engine_http_request': {
                'http_method': 'POST',
                'relative_uri': '/task/'+queue
            }
    }
    if payload is not None:
        converted_payload = payload.encode()

        task['app_engine_http_request']['body'] = converted_payload

    if in_seconds is not None:
        d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)

        timestamp = timestamp_pb2.Timestamp()
        timestamp.FromDatetime(d)

        task['schedule_time'] = timestamp

    response = client.create_task(parent, task)

    print('Created task {}'.format(response.name))
    print(response)

#You can change DOCUMENT_ID with USER_ID or something to identify the task
create_task(PROJECT_ID, QUEUE, REGION, DOCUMENT_ID)