如何使部分脚本与 dask 异步?
How do I make part of the script asynchronous with dask?
假设我得到了一组文件。我需要将它们标记化,然后将它们转换为向量以供进一步工作。当我发现 elasticsearch 的分词器比我自己的解决方案工作得更好时,我正在转向它。但是,它要慢得多。然后最终结果预计将以流的形式输入矢量化器。
整个过程可以用生成器链表完成
def fetch_documents(_cursor):
with _cursor:
# a lot of documents expected, may not fit in memory
_cursor.execute('select ... from ...')
for doc in _cursor:
yield doc
def tokenize(documents):
for doc in documents:
yield elasticsearch_tokenize_me(doc)
def build_model(documents):
some_model = SomeModel()
for doc in documents:
some_model.add_document(doc)
return some_model
build_model(tokenize(fetch_documents))
所以这基本上工作正常,但没有利用所有可用的处理能力。由于dask在其他相关项目中使用,我尝试适应并获得这个(我正在使用psycopg2进行数据库访问)。
from dask import delayed
import psycopg2
import psycopg2.extras
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
def loader():
conn = psycopg2.connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('''
SELECT document, ... FROM ...
''')
return cur
@delayed
def tokenize(partition):
result = []
client = IndicesClient(Elasticsearch())
for row in partition:
_result = client.analyze(analyzer='standard', text=row['document'])
result.append(dict(row,
tokens=tuple(item['token'] for item in _result['tokens'])))
return result
@delayed
def build_model(sequence_of_data):
some_model = SomeModel()
for item in chain.from_iterable(sequence_of_data):
some_model.add_document(item)
return some_model
with loader() as cur:
partitions = []
for idx_start in range(0, cur.rowcount, 200):
partitions.append(delayed(cur.fetchmany)(200))
tokenized = []
for partition in partitions:
tokenized.append(tokenize(partition))
result = do_something(tokenized)
result.compute()
代码或多或少的工作,除了最后所有文档在被输入模型之前被标记化。虽然这适用于较小的数据集合,但不适用于大量数据(由于大量内存消耗)。我应该只使用简单的 concurrent.futures
还是我错误地使用了 dask?
仅 concurrent.futures
用于工作
from concurrent.futures import ProcessPoolExecutor
def loader():
conn = psycopg2.connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('''
SELECT document, ... FROM ...
''')
return cur
def tokenize(partition):
result = []
client = IndicesClient(Elasticsearch())
for row in partition:
_result = client.analyze(analyzer='standard', text=row['document'])
result.append(dict(row,
tokens=tuple(item['token'] for item in _result['tokens'])))
return result
def do_something(partitions, total):
some_model = 0
for partition in partitions:
result = partition.result()
for item in result:
some_model.add_document(item)
return some_model
with loader() as cur, \
ProcessPoolExecutor(max_workers=8) as executor:
print(cur.rowcount)
partitions = []
for idx_start in range(0, cur.rowcount, 200):
partitions.append(executor.submit(tokenize,
cur.fetchmany(200)))
build_model(partitions)
一个简单的解决方案是在您的机器上本地加载数据(很难对单个 SQL 查询进行分区),然后将数据发送到 dask-cluster 以进行昂贵的标记化步骤。也许是这样的:
rows = cur.execute(''' SELECT document, ... FROM ... ''')
from toolz import partition_all, concat
partitions = partition_all(10000, rows)
from dask.distributed import Executor
e = Executor('scheduler-address:8786')
futures = []
for part in partitions:
x = e.submit(tokenize, part)
y = e.submit(process, x)
futures.append(y)
results = e.gather(futures)
result = list(concat(results))
在这个例子中,函数 tokenize 和 process 期望消耗和return一个元素列表。
假设我得到了一组文件。我需要将它们标记化,然后将它们转换为向量以供进一步工作。当我发现 elasticsearch 的分词器比我自己的解决方案工作得更好时,我正在转向它。但是,它要慢得多。然后最终结果预计将以流的形式输入矢量化器。
整个过程可以用生成器链表完成
def fetch_documents(_cursor):
with _cursor:
# a lot of documents expected, may not fit in memory
_cursor.execute('select ... from ...')
for doc in _cursor:
yield doc
def tokenize(documents):
for doc in documents:
yield elasticsearch_tokenize_me(doc)
def build_model(documents):
some_model = SomeModel()
for doc in documents:
some_model.add_document(doc)
return some_model
build_model(tokenize(fetch_documents))
所以这基本上工作正常,但没有利用所有可用的处理能力。由于dask在其他相关项目中使用,我尝试适应并获得这个(我正在使用psycopg2进行数据库访问)。
from dask import delayed
import psycopg2
import psycopg2.extras
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
def loader():
conn = psycopg2.connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('''
SELECT document, ... FROM ...
''')
return cur
@delayed
def tokenize(partition):
result = []
client = IndicesClient(Elasticsearch())
for row in partition:
_result = client.analyze(analyzer='standard', text=row['document'])
result.append(dict(row,
tokens=tuple(item['token'] for item in _result['tokens'])))
return result
@delayed
def build_model(sequence_of_data):
some_model = SomeModel()
for item in chain.from_iterable(sequence_of_data):
some_model.add_document(item)
return some_model
with loader() as cur:
partitions = []
for idx_start in range(0, cur.rowcount, 200):
partitions.append(delayed(cur.fetchmany)(200))
tokenized = []
for partition in partitions:
tokenized.append(tokenize(partition))
result = do_something(tokenized)
result.compute()
代码或多或少的工作,除了最后所有文档在被输入模型之前被标记化。虽然这适用于较小的数据集合,但不适用于大量数据(由于大量内存消耗)。我应该只使用简单的 concurrent.futures
还是我错误地使用了 dask?
仅 concurrent.futures
用于工作
from concurrent.futures import ProcessPoolExecutor
def loader():
conn = psycopg2.connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('''
SELECT document, ... FROM ...
''')
return cur
def tokenize(partition):
result = []
client = IndicesClient(Elasticsearch())
for row in partition:
_result = client.analyze(analyzer='standard', text=row['document'])
result.append(dict(row,
tokens=tuple(item['token'] for item in _result['tokens'])))
return result
def do_something(partitions, total):
some_model = 0
for partition in partitions:
result = partition.result()
for item in result:
some_model.add_document(item)
return some_model
with loader() as cur, \
ProcessPoolExecutor(max_workers=8) as executor:
print(cur.rowcount)
partitions = []
for idx_start in range(0, cur.rowcount, 200):
partitions.append(executor.submit(tokenize,
cur.fetchmany(200)))
build_model(partitions)
一个简单的解决方案是在您的机器上本地加载数据(很难对单个 SQL 查询进行分区),然后将数据发送到 dask-cluster 以进行昂贵的标记化步骤。也许是这样的:
rows = cur.execute(''' SELECT document, ... FROM ... ''')
from toolz import partition_all, concat
partitions = partition_all(10000, rows)
from dask.distributed import Executor
e = Executor('scheduler-address:8786')
futures = []
for part in partitions:
x = e.submit(tokenize, part)
y = e.submit(process, x)
futures.append(y)
results = e.gather(futures)
result = list(concat(results))
在这个例子中,函数 tokenize 和 process 期望消耗和return一个元素列表。