在停止的地方重新启动 DynamoDB 批处理写入作业
Restarting DynamoDB batch write job where it left off
我正在使用 boto3 batch_writer 上下文将大型 pandas DataFrame 加载到 DynamoDB table 中。哈希键是 symbol
,排序键是 date
。
with table.batch_writer() as batch:
for row in df.itertuples(index=False, name="R"):
batch.put_item(Item=row)
我的网络连接中断,作业停止。我想从中断的地方开始记录。
DynamoDB table 有 1_400_899 个项目。我的 DataFrame 有 5_998_099 行。
当我检查索引 1_400_899 处和周围的 DataFrame 时,DynamoDB 中不存在这些记录。这让我觉得行不是按顺序插入的。
我如何确定我离开的地方,以便我可以适当地分割 DataFrame 并重新启动作业?
Dynamodb 的 put_item
不保证项目将按顺序插入,因此您不能依赖项目插入的顺序。现在,回到您的问题 我如何确定我离开的地方,以便我可以适当地分割 DataFrame 并重新启动作业?
唯一确定的方法是扫描整个 table 并检索已插入的主键列的值,然后从原始数据框中删除这些键并再次开始批量写入操作。
下面是一些可以帮助您完成工作的代码:
def scan_table(table, keys, **kwargs):
resp = table.scan(ProjectionExpression=', '.join(keys), **kwargs)
yield from resp['Items']
if 'LastEvaluatedKey' in resp:
yield from scan_table(table, keys, ExclusiveStartKey=resp['LastEvaluatedKey'])
keys = ['symbol', 'date']
df_saved = pd.DataFrame(scan_table(table, keys))
i1 = df.set_index(keys).index
i2 = df_saved.set_index(keys).index
df_not_saved = df[~i1.isin(i2)]
现在您可以在 df_not_saved
而不是 df
上重新启动批量写入操作
with table.batch_writer() as batch:
for row in df_not_saved.itertuples(index=False, name="R"):
batch.put_item(Item=row)
Python batch_writer()
是围绕 DynamoDB 的 BatchWriteItem 操作的实用程序。它将您的工作分成较小的项目集(BatchWriteItem 限制为 25 个项目),并使用 BatchWriteItem
.
写入每个批次
通常,这些写入 在某种意义上是 顺序的:如果您的客户端设法将一批写入发送到 DynamoDB,它们将全部完成,即使您失去连接.但是,这里有一个问题:BatchWriteItem
不能保证成功写入所有项目。如果不能,通常是因为您使用的容量超过了预留容量,它会 returns UnprocessedItems
- 需要重新发送的项目列表。 batch_writer()
稍后会重新发送这些项目 - 但如果您在此时中断它 - 可能会写入最后 25 个项目中的随机子集,但不是全部。因此,请确保至少返回 25 项,以确保您已到达 batch_writer() 成功写入所有内容的位置。
另一个问题是你从哪里得到的 DynamoDB table 有 1_400_899 项的信息。 DynamoDB 确实有这样的数字,但他们声称它每 6 小时仅更新一次。你等了6个小时了吗?
我正在使用 boto3 batch_writer 上下文将大型 pandas DataFrame 加载到 DynamoDB table 中。哈希键是 symbol
,排序键是 date
。
with table.batch_writer() as batch:
for row in df.itertuples(index=False, name="R"):
batch.put_item(Item=row)
我的网络连接中断,作业停止。我想从中断的地方开始记录。
DynamoDB table 有 1_400_899 个项目。我的 DataFrame 有 5_998_099 行。
当我检查索引 1_400_899 处和周围的 DataFrame 时,DynamoDB 中不存在这些记录。这让我觉得行不是按顺序插入的。
我如何确定我离开的地方,以便我可以适当地分割 DataFrame 并重新启动作业?
Dynamodb 的 put_item
不保证项目将按顺序插入,因此您不能依赖项目插入的顺序。现在,回到您的问题 我如何确定我离开的地方,以便我可以适当地分割 DataFrame 并重新启动作业?
唯一确定的方法是扫描整个 table 并检索已插入的主键列的值,然后从原始数据框中删除这些键并再次开始批量写入操作。
下面是一些可以帮助您完成工作的代码:
def scan_table(table, keys, **kwargs):
resp = table.scan(ProjectionExpression=', '.join(keys), **kwargs)
yield from resp['Items']
if 'LastEvaluatedKey' in resp:
yield from scan_table(table, keys, ExclusiveStartKey=resp['LastEvaluatedKey'])
keys = ['symbol', 'date']
df_saved = pd.DataFrame(scan_table(table, keys))
i1 = df.set_index(keys).index
i2 = df_saved.set_index(keys).index
df_not_saved = df[~i1.isin(i2)]
现在您可以在 df_not_saved
而不是 df
with table.batch_writer() as batch:
for row in df_not_saved.itertuples(index=False, name="R"):
batch.put_item(Item=row)
Python batch_writer()
是围绕 DynamoDB 的 BatchWriteItem 操作的实用程序。它将您的工作分成较小的项目集(BatchWriteItem 限制为 25 个项目),并使用 BatchWriteItem
.
通常,这些写入 在某种意义上是 顺序的:如果您的客户端设法将一批写入发送到 DynamoDB,它们将全部完成,即使您失去连接.但是,这里有一个问题:BatchWriteItem
不能保证成功写入所有项目。如果不能,通常是因为您使用的容量超过了预留容量,它会 returns UnprocessedItems
- 需要重新发送的项目列表。 batch_writer()
稍后会重新发送这些项目 - 但如果您在此时中断它 - 可能会写入最后 25 个项目中的随机子集,但不是全部。因此,请确保至少返回 25 项,以确保您已到达 batch_writer() 成功写入所有内容的位置。
另一个问题是你从哪里得到的 DynamoDB table 有 1_400_899 项的信息。 DynamoDB 确实有这样的数字,但他们声称它每 6 小时仅更新一次。你等了6个小时了吗?