在停止的地方重新启动 DynamoDB 批处理写入作业

Restarting DynamoDB batch write job where it left off

我正在使用 boto3 batch_writer 上下文将大型 pandas DataFrame 加载到 DynamoDB table 中。哈希键是 symbol,排序键是 date

with table.batch_writer() as batch:
    for row in df.itertuples(index=False, name="R"):
        batch.put_item(Item=row)

我的网络连接中断,作业停止。我想从中断的地方开始记录。

DynamoDB table 有 1_400_899 个项目。我的 DataFrame 有 5_998_099 行。

当我检查索引 1_400_899 处和周围的 DataFrame 时,DynamoDB 中不存在这些记录。这让我觉得行不是按顺序插入的。

我如何确定我离开的地方,以便我可以适当地分割 DataFrame 并重新启动作业?

Dynamodb 的 put_item 不保证项目将按顺序插入,因此您不能依赖项目插入的顺序。现在,回到您的问题 我如何确定我离开的地方,以便我可以适当地分割 DataFrame 并重新启动作业?

唯一确定的方法是扫描整个 table 并检索已插入的主键列的值,然后从原始数据框中删除这些键并再次开始批量写入操作。

下面是一些可以帮助您完成工作的代码:

def scan_table(table, keys, **kwargs):
    resp = table.scan(ProjectionExpression=', '.join(keys), **kwargs)
    yield from resp['Items']
    if 'LastEvaluatedKey' in resp:
        yield from scan_table(table, keys, ExclusiveStartKey=resp['LastEvaluatedKey'])


keys = ['symbol', 'date']
df_saved = pd.DataFrame(scan_table(table, keys))

i1 = df.set_index(keys).index
i2 = df_saved.set_index(keys).index

df_not_saved = df[~i1.isin(i2)]

现在您可以在 df_not_saved 而不是 df

上重新启动批量写入操作
with table.batch_writer() as batch:
    for row in df_not_saved.itertuples(index=False, name="R"):
        batch.put_item(Item=row)

Python batch_writer() 是围绕 DynamoDB 的 BatchWriteItem 操作的实用程序。它将您的工作分成较小的项目集(BatchWriteItem 限制为 25 个项目),并使用 BatchWriteItem.

写入每个批次

通常,这些写入 在某种意义上是 顺序的:如果您的客户端设法将一批写入发送到 DynamoDB,它们将全部完成,即使您失去连接.但是,这里有一个问题:BatchWriteItem 不能保证成功写入所有项目。如果不能,通常是因为您使用的容量超过了预留容量,它会 returns UnprocessedItems - 需要重新发送的项目列表。 batch_writer() 稍后会重新发送这些项目 - 但如果您在此时中断它 - 可能会写入最后 25 个项目中的随机子集,但不是全部。因此,请确保至少返回 25 项,以确保您已到达 batch_writer() 成功写入所有内容的位置。

另一个问题是你从哪里得到的 DynamoDB table 有 1_400_899 项的信息。 DynamoDB 确实有这样的数字,但他们声称它每 6 小时仅更新一次。你等了6个小时了吗?