子工作流的输入大小时超时异常

Timeout exception when size of the input to child workflow is huge

16:37:21.945 [Workflow Executor taskList="PullFulfillmentsTaskList", domain="test-domain": 3] WARN com.uber.cadence.internal.common.Retryer - Retrying after failure
org.apache.thrift.transport.TTransportException: Request timeout after 1993ms
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.throwOnRpcError(WorkflowServiceTChannel.java:546)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.doRemoteCall(WorkflowServiceTChannel.java:519)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.respondDecisionTaskCompleted(WorkflowServiceTChannel.java:962)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.lambda$RespondDecisionTaskCompleted(WorkflowServiceTChannel.java:951)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.measureRemoteCall(WorkflowServiceTChannel.java:569)
    at com.uber.cadence.serviceclient.WorkflowServiceTChannel.RespondDecisionTaskCompleted(WorkflowServiceTChannel.java:949)
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.lambda$sendReply[=10=](WorkflowWorker.java:301)
    at com.uber.cadence.internal.common.Retryer.lambda$retry[=10=](Retryer.java:104)
    at com.uber.cadence.internal.common.Retryer.retryWithResult(Retryer.java:122)
    at com.uber.cadence.internal.common.Retryer.retry(Retryer.java:101)
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.sendReply(WorkflowWorker.java:301)
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:261)
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:229)
    at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process[=10=](PollTaskExecutor.java:71)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我们的父工作流代码基本是这样的(JSONObject来自org.json)

JSONObject[] array = restActivities.getArrayWithHugeJSONItems();
for(JSONObject hugeJSON: array) {
  ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class);
  child.run(hugeJSON);
}

我们发现,大多数时候,父工作流worker无法启动子工作流并抛出上述超时异常。它疯狂地重试但从未成功并一遍又一遍地打印超时异常。然而,有时我们很幸运并且它有效。有时它甚至更早地在 activity worker 上失败,并抛出相同的异常。我们认为这是由于数据量过大(大约 5MB),无法在超时时间内发送(从日志来看我们猜测它设置为 2s)。如果我们用小的假数据调用 child.run,它 100% 有效。

我们使用子工作流的原因是我们想并行使用 Async.function 到 运行 它们。那么我们如何解决这个问题呢?是否有我们应该增加的节俭超时配置,或者我们可以通过某种方式避免传递大量数据?

提前致谢!

---Maxim回答后更新---

谢谢。我阅读了示例,但对我的用例仍有一些疑问。假设我的 RestActivitiesWorker 中有一个包含 100 个巨大 JSON 对象的数组,如果我不应该 return 工作流中的这个巨大数组,我需要对数据库进行 100 次调用以创建 100 行记录并将 100 个 id 放入一个数组中并将其传回工作流。然后工作流为每个 id 创建一个子工作流。然后,每个子工作流调用另一个具有 id 的 activity 以从数据库加载数据。但是 activity 必须将那个巨大的 JSON 传递给子工作流,这样可以吗?而对于 RestActivitiesWorker 向数据库中进行 100 次插入,如果它在中间失败了怎么办?

我想这归结为我们的工作流程正试图直接处理巨大的 JSON。我们正在尝试从外部系统将巨大的 JSON(5-30MB,不是那么大)加载到我们的系统中。我们稍微分解一下 JSON ,操作一些值,并使用来自几个字段的值来做一些不同的逻辑,最后将它保存在我们的数据库中。我们应该如何使用 Temporal 执行此操作?

Temporal/Cadence 不支持将大 blob 作为输入和输出传递,因为它使用数据库作为底层存储。所以你想改变你的应用程序的架构来避免这种情况。

标准解决方法是:

  • 使用外部 blob 存储来保存大数据并将引用作为参数传递给它。
  • 在工作进程甚至主机磁盘上缓存数据,并将操作此数据的活动路由到该进程或主机。请参阅此方法的文件处理示例。