使用 Dask、分布式进行大规模并行搜索操作
Massively parallel search operation with Dask, Distributed
我在 Kubernetes 和 AWS 上测试自动缩放 Dask 分布式实现时创建了一个演示问题,我不确定我是否正确解决了这个问题。
我的场景是给定一个字符串的 md5 散列(代表密码)以找到原始字符串。我遇到了三个主要问题。
A) 参数 space 很大,尝试创建一个包含 2.8211099e+12 个成员的 dask 包会导致内存问题(因此您将在下面的示例代码中看到 'explode' 函数).
B) 早期发现时清除出口。我认为使用 take(1, npartitions=-1)
可以实现这一点,但我不确定。最初我提出了一个异常 raise Exception("%s is your answer' % test_str)
,它有效但感觉 "dirty"
C) 鉴于这个时间很长 运行 并且有时工作人员或 AWS 机器会死亡,最好如何存储进度?
示例代码:
import distributed
import math
import dask.bag as db
import hashlib
import dask
import os
if os.environ.get('SCHED_URL', False):
sched_url = os.environ['SCHED_URL']
client = distributed.Client(sched_url)
versions = client.get_versions(True)
dask.set_options(get=client.get)
difficulty = 'easy'
settings = {
'hard': (hashlib.md5('welcome1'.encode('utf-8')).hexdigest(),'abcdefghijklmnopqrstuvwxyz1234567890', 8),
'mid-hard': (hashlib.md5('032abgh'.encode('utf-8')).hexdigest(),'abcdefghijklmnop1234567890', 7),
'mid': (hashlib.md5('b08acd'.encode('utf-8')).hexdigest(),'0123456789abcdef', 6),
'easy': (hashlib.md5('0812'.encode('utf-8')).hexdigest(),'0123456789', 4)
}
hashed_pw, keyspace, max_guess_length = settings[difficulty]
def is_pw(guess):
return hashlib.md5(guess.encode('utf-8')).hexdigest() == hashed_pw
def guess(n):
guess = ''
size = len(keyspace)
while n>0 :
n -= 1
guess += keyspace[n % size];
n = math.floor(n / size);
return guess
def make_exploder(num_partitions, max_val):
"""Creates a function that maps a int to a range based on the number maximum value aimed for
and the number of partitions that are expected.
Used in this code used with map and flattent to take a short list
i.e 1->1e6 to a large one 1->1e20 in dask rather than on the host machine."""
steps = math.ceil(max_val / num_partitions)
def explode(partition):
return range(partition * steps, partition * steps + steps)
return explode
max_val = len(keyspace) ** max_guess_length # How many possiable password permutation
partitions = math.floor(max_val / 100)
partitions = partitions if partitions < 100000 else 100000 # split in to a maximum of 10000 partitions. Too many partitions caused issues, memory I think.
exploder = make_exploder(partitions, max_val) # Sort of the opposite of a reduce. make_exploder(10, 100)(3) => [30, 31, ..., 39]. Expands the problem back in to the full problem space.
print("max val: %s, partitions:%s" % (max_val, partitions))
search = db.from_sequence(range(partitions), npartitions=partitions).map(exploder).flatten().filter(lambda i: i <= max_val).map(guess).filter(is_pw)
search.take(1,npartitions=-1)
我发现 'easy' 在本地运行良好,'mid-hard' 在我们的 6 到 8 * m4.2xlarge AWS 集群上运行良好。但到目前为止还没有 hard
工作。
A) the parameter space is massive and trying to create a dask bag with 2.8211099e+12 members caused memory issues (hence the 'explode' function you'll see in the sample code below).
这在很大程度上取决于您如何将元素放入包中。如果每个元素都在自己的分区中,那么是的,这肯定会杀死一切。 1e12 分区非常昂贵。我建议将分区数保持在几千或几万。
B) Clean exit on early find. I think using take(1, npartitions=-1) will achieve this but I wasn't sure. Originally I raised an exception raise Exception("%s is your answer' % test_str) which worked but felt "dirty"
如果你想要这个,那么我建议不要使用 dask.bag,而是使用 concurrent.futures interface and in particular the as_completed 迭代器。
C) Given this is long running and sometimes workers or AWS boxes die, how would it be best to store progress?
Dask 应该对此有弹性,只要你能保证调度程序存活。如果您使用并发期货接口而不是 dask 包,那么您还可以在客户端进程上跟踪中间结果。
我在 Kubernetes 和 AWS 上测试自动缩放 Dask 分布式实现时创建了一个演示问题,我不确定我是否正确解决了这个问题。
我的场景是给定一个字符串的 md5 散列(代表密码)以找到原始字符串。我遇到了三个主要问题。
A) 参数 space 很大,尝试创建一个包含 2.8211099e+12 个成员的 dask 包会导致内存问题(因此您将在下面的示例代码中看到 'explode' 函数).
B) 早期发现时清除出口。我认为使用 take(1, npartitions=-1)
可以实现这一点,但我不确定。最初我提出了一个异常 raise Exception("%s is your answer' % test_str)
,它有效但感觉 "dirty"
C) 鉴于这个时间很长 运行 并且有时工作人员或 AWS 机器会死亡,最好如何存储进度?
示例代码:
import distributed
import math
import dask.bag as db
import hashlib
import dask
import os
if os.environ.get('SCHED_URL', False):
sched_url = os.environ['SCHED_URL']
client = distributed.Client(sched_url)
versions = client.get_versions(True)
dask.set_options(get=client.get)
difficulty = 'easy'
settings = {
'hard': (hashlib.md5('welcome1'.encode('utf-8')).hexdigest(),'abcdefghijklmnopqrstuvwxyz1234567890', 8),
'mid-hard': (hashlib.md5('032abgh'.encode('utf-8')).hexdigest(),'abcdefghijklmnop1234567890', 7),
'mid': (hashlib.md5('b08acd'.encode('utf-8')).hexdigest(),'0123456789abcdef', 6),
'easy': (hashlib.md5('0812'.encode('utf-8')).hexdigest(),'0123456789', 4)
}
hashed_pw, keyspace, max_guess_length = settings[difficulty]
def is_pw(guess):
return hashlib.md5(guess.encode('utf-8')).hexdigest() == hashed_pw
def guess(n):
guess = ''
size = len(keyspace)
while n>0 :
n -= 1
guess += keyspace[n % size];
n = math.floor(n / size);
return guess
def make_exploder(num_partitions, max_val):
"""Creates a function that maps a int to a range based on the number maximum value aimed for
and the number of partitions that are expected.
Used in this code used with map and flattent to take a short list
i.e 1->1e6 to a large one 1->1e20 in dask rather than on the host machine."""
steps = math.ceil(max_val / num_partitions)
def explode(partition):
return range(partition * steps, partition * steps + steps)
return explode
max_val = len(keyspace) ** max_guess_length # How many possiable password permutation
partitions = math.floor(max_val / 100)
partitions = partitions if partitions < 100000 else 100000 # split in to a maximum of 10000 partitions. Too many partitions caused issues, memory I think.
exploder = make_exploder(partitions, max_val) # Sort of the opposite of a reduce. make_exploder(10, 100)(3) => [30, 31, ..., 39]. Expands the problem back in to the full problem space.
print("max val: %s, partitions:%s" % (max_val, partitions))
search = db.from_sequence(range(partitions), npartitions=partitions).map(exploder).flatten().filter(lambda i: i <= max_val).map(guess).filter(is_pw)
search.take(1,npartitions=-1)
我发现 'easy' 在本地运行良好,'mid-hard' 在我们的 6 到 8 * m4.2xlarge AWS 集群上运行良好。但到目前为止还没有 hard
工作。
A) the parameter space is massive and trying to create a dask bag with 2.8211099e+12 members caused memory issues (hence the 'explode' function you'll see in the sample code below).
这在很大程度上取决于您如何将元素放入包中。如果每个元素都在自己的分区中,那么是的,这肯定会杀死一切。 1e12 分区非常昂贵。我建议将分区数保持在几千或几万。
B) Clean exit on early find. I think using take(1, npartitions=-1) will achieve this but I wasn't sure. Originally I raised an exception raise Exception("%s is your answer' % test_str) which worked but felt "dirty"
如果你想要这个,那么我建议不要使用 dask.bag,而是使用 concurrent.futures interface and in particular the as_completed 迭代器。
C) Given this is long running and sometimes workers or AWS boxes die, how would it be best to store progress?
Dask 应该对此有弹性,只要你能保证调度程序存活。如果您使用并发期货接口而不是 dask 包,那么您还可以在客户端进程上跟踪中间结果。