Google appengine:任务队列性能
Google appengine: Task queue performance
我目前在 appengine 上有一个应用程序 运行ning,我正在使用延迟库执行一些作业,其中一些任务 运行 每天执行一次,而其中一些每月执行一次.大多数这些任务查询 Datastore 以检索文档,然后将实体存储在索引中(搜索 API)。其中一些表格每月更换一次,我必须 运行 在所有实体 (4~5M) 上执行这些任务。
此类任务的一个示例是:
def addCompaniesToIndex(cursor=None, n_entities=0, mindate=None):
#get index
BATCH_SIZE = 200
cps, next_cursor, more = Company.query().\
fetch_page(BATCH_SIZE,
start_cursor=cursor)
doc_list = []
for i in range(0, len(cps)):
cp = cps[i]
#create a Index Document using the Datastore entity
#this document has only about 5 text fields and one date field
cp_doc = getCompanyDocument(cp)
doc_list.append(cp_doc)
index = search.Index(name='Company')
index.put(doc_list)
n_entities += len(doc_list)
if more:
logging.debug('Company: %d added to index', n_entities)
#to_put[:] = []
doc_list[:] = []
deferred.defer(addCompaniesToIndex,
cursor=next_cursor,
n_entities=n_entities,
mindate=mindate)
else:
logging.debug('Finished Company index creation (%d processed)', n_entities)
当我只 运行 一个任务时,执行每个延迟任务大约需要 4-5 秒,因此索引我的 500 万个实体大约需要 35 小时。
另一件事是,当我 运行 在同一队列上使用不同的延迟任务对另一个索引(例如,每日更新之一)进行更新时,两者的执行速度都会慢很多。并且每个延迟调用开始花费大约 10-15 秒,这是无法忍受的。
我的问题是:有没有一种方法可以更快地执行此操作并每次将推送队列扩展到多个作业 运行ning?或者我应该使用不同的方法来解决这个问题?
提前致谢,
通过将 if more
语句放在 addCompaniesToIndex()
函数的末尾,您实际上是在序列化任务执行:直到当前延迟任务完成对其份额的索引后,才会创建下一个延迟任务文档数量。
您可以做的是在 Company.query().fetch_page()
调用之后立即移动 if more
语句,您可以获得(大部分)下一个延迟任务执行所需的变量。
这样,下一个延迟任务将在当前任务完成之前创建并入队(长),因此它们的处理可能是 overlapping/staggered。您还需要进行一些其他修改,例如处理 n_entities
变量,该变量在更新后的场景中失去了当前的含义 - 但或多或少 cosmetic/informational,对于实际的文档索引操作来说并不是必需的。
如果延迟任务的数量非常多,则存在同时排队太多任务的风险,这可能会导致 "explosion" 派生 GAE 来处理它们的实例数量。在这种情况下,您可以 "throttle" 通过延迟执行延迟任务来生成延迟任务的速率,请参阅 。
我想我终于设法通过使用两个队列和上一个答案提出的想法解决了这个问题。
- 在第一个队列中,我们只查询主要实体(keys_only)。并在第二个队列中为这些键启动另一个任务。然后,第一个任务将在 queue 1 上重新启动,并带有 next_cursor.
- 第二个队列获取实体键并对全文进行所有查询和插入 search/BigQuery/PubSub。 (这很慢~每组 100 个键 15 秒)
我也试过只使用一个队列,但处理吞吐量不是很好。我相信这可能是因为我们在同一个队列和调度程序 might not work as well in this case.
上有慢任务和快任务 运行
我目前在 appengine 上有一个应用程序 运行ning,我正在使用延迟库执行一些作业,其中一些任务 运行 每天执行一次,而其中一些每月执行一次.大多数这些任务查询 Datastore 以检索文档,然后将实体存储在索引中(搜索 API)。其中一些表格每月更换一次,我必须 运行 在所有实体 (4~5M) 上执行这些任务。
此类任务的一个示例是:
def addCompaniesToIndex(cursor=None, n_entities=0, mindate=None):
#get index
BATCH_SIZE = 200
cps, next_cursor, more = Company.query().\
fetch_page(BATCH_SIZE,
start_cursor=cursor)
doc_list = []
for i in range(0, len(cps)):
cp = cps[i]
#create a Index Document using the Datastore entity
#this document has only about 5 text fields and one date field
cp_doc = getCompanyDocument(cp)
doc_list.append(cp_doc)
index = search.Index(name='Company')
index.put(doc_list)
n_entities += len(doc_list)
if more:
logging.debug('Company: %d added to index', n_entities)
#to_put[:] = []
doc_list[:] = []
deferred.defer(addCompaniesToIndex,
cursor=next_cursor,
n_entities=n_entities,
mindate=mindate)
else:
logging.debug('Finished Company index creation (%d processed)', n_entities)
当我只 运行 一个任务时,执行每个延迟任务大约需要 4-5 秒,因此索引我的 500 万个实体大约需要 35 小时。
另一件事是,当我 运行 在同一队列上使用不同的延迟任务对另一个索引(例如,每日更新之一)进行更新时,两者的执行速度都会慢很多。并且每个延迟调用开始花费大约 10-15 秒,这是无法忍受的。
我的问题是:有没有一种方法可以更快地执行此操作并每次将推送队列扩展到多个作业 运行ning?或者我应该使用不同的方法来解决这个问题?
提前致谢,
通过将 if more
语句放在 addCompaniesToIndex()
函数的末尾,您实际上是在序列化任务执行:直到当前延迟任务完成对其份额的索引后,才会创建下一个延迟任务文档数量。
您可以做的是在 Company.query().fetch_page()
调用之后立即移动 if more
语句,您可以获得(大部分)下一个延迟任务执行所需的变量。
这样,下一个延迟任务将在当前任务完成之前创建并入队(长),因此它们的处理可能是 overlapping/staggered。您还需要进行一些其他修改,例如处理 n_entities
变量,该变量在更新后的场景中失去了当前的含义 - 但或多或少 cosmetic/informational,对于实际的文档索引操作来说并不是必需的。
如果延迟任务的数量非常多,则存在同时排队太多任务的风险,这可能会导致 "explosion" 派生 GAE 来处理它们的实例数量。在这种情况下,您可以 "throttle" 通过延迟执行延迟任务来生成延迟任务的速率,请参阅 。
我想我终于设法通过使用两个队列和上一个答案提出的想法解决了这个问题。
- 在第一个队列中,我们只查询主要实体(keys_only)。并在第二个队列中为这些键启动另一个任务。然后,第一个任务将在 queue 1 上重新启动,并带有 next_cursor.
- 第二个队列获取实体键并对全文进行所有查询和插入 search/BigQuery/PubSub。 (这很慢~每组 100 个键 15 秒)
我也试过只使用一个队列,但处理吞吐量不是很好。我相信这可能是因为我们在同一个队列和调度程序 might not work as well in this case.
上有慢任务和快任务 运行