从 azure databricks 中的多任务作业写入分区增量 Table 时出错

Error writing a partitioned Delta Table from a multitasking job in azure databricks

我有一个笔记本,上面写了一个 delta table,其语句类似于以下内容:

match = "current.country = updates.country and current.process_date = updates.process_date"
deltaTable = DeltaTable.forPath(spark, silver_path)
deltaTable.alias("current")\
.merge(
    data.alias("updates"),
    match) \
  .whenMatchedUpdate(
      set = update_set,
      condition = condition) \
  .whenNotMatchedInsert(values = values_set)\
  .execute()

多任务作业有两个并行执行的任务。

执行作业时显示以下错误:

ConcurrentAppendException: Files were added to partition [country=Panamá, process_date=2022-01-01 00:00:00] by a concurrent update. Please try the operation again.

在每个任务中我发送不同的国家(巴拿马,厄瓜多尔)和相同的日期作为参数,所以执行时只需要写入与发送国家对应的信息。 此增量 table 按国家和 process_date 字段划分。 任何想法我做错了什么? 使用“merge”语句时应该如何指定受影响的分区?

如果您能说明在这些情况下我应该如何使用分区,我将不胜感激,因为这对我来说是新的。

更新: 我根据指示 here (ConcurrentAppendException) 对条件进行了调整,以指定国家和处理日期。 现在我收到以下错误消息:

ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.

我想不出是什么导致了这个错误。继续调查。

Error – ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.

此异常通常在并发 DELETE、UPDATE 或 MERGE 操作期间抛出。虽然并发操作可能在物理上更新不同的分区目录,但其中一个可能会读取另一个同时更新的同一分区,从而导致冲突。 您可以通过在操作条件中明确分隔来避免这种情况。

当在映射中使用 'Update Strategy' 转换时,将为 Delta Lake 目标 table 执行更新查询。当多个更新策略转换用于同一目标时 table,多个更新查询将并行执行,因此目标数据将是不可预测的table。 由于并发 UPDATE 查询的 Delta Lake 目标中的 unpredictable 数据场景,不支持在映射中每个 'Databricks Delta Lake Table' 使用多个 'Update Strategy' 转换。 重新设计映射,使每个 Delta Lake table.

有一个 'Update Strategy' 转换

解-

虽然 运行 每个 Databricks Delta Lake table 具有一个 'Update Strategy' 转换的映射 table,但执行将成功完成。

参考 - https://docs.delta.io/latest/concurrency-control.html#avoid-conflicts-using-partitioning-and-disjoint-command-conditions

最初,受影响的 table 只有一个日期字段作为分区。所以我用国家和日期字段对它进行了分区。 这个新分区创建了国家和日期目录,但是日期分区的旧目录仍然存在并且没有被删除。

显然,这些目录在尝试并发读取时引起了冲突。 我在具有正确分区的另一条路径上创建了一个新的增量,然后将其替换在原始路径上。这允许删除旧的分区目录。

执行这些操作的唯一后果是我丢失了 table(时间旅行)的更改历史记录。