TypeError: cannot pickle '_thread.lock' object Dask compute

TypeError: cannot pickle '_thread.lock' object Dask compute

我正在尝试使用 dask 进行多处理。我有一个函数必须 运行 用于 10000 个文件并将生成文件作为输出。函数将 S3 存储桶中的文件作为输入,并处理 S3 中具有相似日期和时间的另一个文件。我正在 JupyterLab

中做所有事情

所以这是我的函数:

def get_temp(file, name):
    
    d=[name[0:4],name[4:6],name[6:8],name[9:11],name[11:13]] 
    f_zip = gzip.decompress(file)
    
    yr=d[0]
    mo=d[1]
    da=d[2]
    hr=d[3]
    mn=d[4]
    
    fs = s3fs.S3FileSystem(anon=True)

    period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da), freq='D')
    # period.dayofyear
    dy=period.dayofyear

    cc=[7,8,9,10,11,12,13,14,15,16]  #look at the IR channels only for now
    dat = xr.open_dataset(f_zip)
    dd=dat[['recNum','trackLat','trackLon', 'temp']]
    dd=dd.to_dataframe()
    dd = dd.dropna()
    dd['num'] = np.arange(len(dd))
    
    l=dd.where((dd.trackLat>-50.0) & (dd.trackLat<50.0) & (dd.trackLon>-110.0) & (dd.trackLon<10.0))
    l = l.dropna() 
    l.reset_index()
    
    dy="{0:0=3d}".format(dy)
    
#opening goes data from S3
    F=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C07'+'*')[int(mn)//15]))
    
#Converting Lat lon to radiance
    req=F['goes_imager_projection'].semi_major_axis
    oneovf=F['goes_imager_projection'].inverse_flattening
    rpol=F['goes_imager_projection'].semi_minor_axis 
    e = 0.0818191910435
    sat_h=F['goes_imager_projection'].perspective_point_height

    H=req+sat_h
    gc=np.deg2rad(F['goes_imager_projection'].longitude_of_projection_origin)

    phi=np.deg2rad(l.trackLat.values)
    gam=np.deg2rad(l.trackLon.values)

    phic=np.arctan((rpol**2/req**2)*np.tan(phi))
    rc=rpol/np.sqrt((1-e**2*np.cos(phic)**2))
    sx=H-rc*np.cos(phic)*np.cos(gam-gc)
    sy=-rc*np.cos(phic)*np.sin(gam-gc)
    sz=rc*np.sin(phic)

    yy=np.arctan(sz/sx)
    xx=np.arcsin(-sy/(np.sqrt(sx**2+sy**2+sz**2)))
    
    for i in range(len(xx)):
        for c in range(len(ch):
            ch="{0:0=2d}".format(cc[c])
            
            F1=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[0]))

            F2=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-1]))
           
            G1 = F1.where((F1.x >= (xx[i]-0.005)) & (F1.x <= (xx[i]+0.005)) & (F1.y >= (yy[i]-0.005)) & (F1.y <= (yy[i]+0.005)), drop=True)
            G2 = F2.where((F2.x >= (xx[i]-0.005)) & (F2.x <= (xx[i]+0.005)) & (F2.y >= (yy[i]-0.005)) & (F2.y <= (yy[i]+0.005)), drop=True)
            
            G = xr.concat([G1, G2], dim  = 'time')
            G = G.assign_coords(channel=(ch))
            
            if c == 0:
                T = G    
            else:
                T = xr.concat([T, G], dim = 'channel')
                
        T = T.assign_coords(temp=(str(l['temp'][i])))
        print(l.iloc[i]['num'])
        path = name+'_'+str(int(l.iloc[i]['num']))+'.nc'          
        T.to_netcdf(path)
    
#zipping the file
        with zipfile.ZipFile(name+'_'+str(int(l.iloc[i]['num']))+'.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zf:
            zf.write(path, arcname=str(name+'_'+str(int(l.iloc[i]['num']))+'.nc'))

 # Storing it to S3    
        s3.Bucket(BUCKET).upload_file(path[:-3]+'.zip', "Output/" + path[:-3]+'.zip')
         

这是我从 S3 调用数据:

s3 = boto3.resource('s3')

s3client = boto3.client(
    's3',
    region_name='us-east-1'
)

bucketname = s3.Bucket('temp')
filedata = []
keys = []
names = []

for my_bucket_object in bucketname.objects.all():
    keys.append(my_bucket_object.key)
    
for i in range(1, 21):
    fileobj = s3client.get_object(
        Bucket='temp',
        Key=(keys[i]))

    filedata.append(fileobj['Body'].read())
    names.append(keys[i][10:-3])

最初,我只是想 运行 20 个文件用于测试目的。

这是我正在创建的 dask 延迟和计算函数:

temp_files = []

for i in range(20):
    s3_ds = dask.delayed(get_temp)(filedata[i], names[i])
    temp_files.append(s3_ds)

temp_files = dask.compute(*temp_files)

这是完整的错误日志:

distributed.protocol.pickle - INFO - Failed to serialize <function get_temp at 0x7f20a9cb8550>. Exception: cannot pickle '_thread.lock' object
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
   3319         with _cache_lock:
-> 3320             result = cache_dumps[func]
   3321     except KeyError:

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in __getitem__(self, key)
   1572     def __getitem__(self, key):
-> 1573         value = super().__getitem__(key)
   1574         self.data.move_to_end(key)

/srv/conda/envs/notebook/lib/python3.8/collections/__init__.py in __getitem__(self, key)
   1009             return self.__class__.__missing__(self, key)
-> 1010         raise KeyError(key)
   1011     def __setitem__(self, key, item): self.data[key] = item

KeyError: <function get_temp at 0x7f20a9cb8550>

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     52                 buffers.clear()
---> 53                 result = cloudpickle.dumps(x, **dump_kwargs)
     54         elif not _always_use_pickle_for(x) and b"__main__" in result:

/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()

/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    562         try:
--> 563             return Pickler.dump(self, obj)
    564         except RuntimeError as e:

TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-77-fa46004f5919> in <module>
----> 1 temp_files = dask.compute(*temp_files)

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2703         Client.compute: Compute asynchronous collections
   2704         """
-> 2705         futures = self._graph_to_futures(
   2706             dsk,
   2707             keys=set(flatten([keys])),

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2639                 {
   2640                     "op": "update-graph",
-> 2641                     "tasks": valmap(dumps_task, dsk),
   2642                     "dependencies": dependencies,
   2643                     "keys": list(map(tokey, keys)),

/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
   3356             return d
   3357         elif not any(map(_maybe_complex, task[1:])):
-> 3358             return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
   3359     return to_serialize(task)
   3360 

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
   3320             result = cache_dumps[func]
   3321     except KeyError:
-> 3322         result = pickle.dumps(func, protocol=4)
   3323         if len(result) < 100000:
   3324             with _cache_lock:

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     58         try:
     59             buffers.clear()
---> 60             result = cloudpickle.dumps(x, **dump_kwargs)
     61         except Exception as e:
     62             logger.info("Failed to serialize %s. Exception: %s", x, e)

/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     71                 file, protocol=protocol, buffer_callback=buffer_callback
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()
     75 

/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    561     def dump(self, obj):
    562         try:
--> 563             return Pickler.dump(self, obj)
    564         except RuntimeError as e:
    565             if "recursion" in e.args[0]:

TypeError: cannot pickle '_thread.lock' object

有人可以在这里帮助我并告诉我我做错了什么。除了Dask还有其他并行处理方式吗?

所以我发现只有当我将文件上传到 S3 存储桶时才会抛出该错误,否则它工作正常。但是如果我不在 S3 中保存文件,我就无法确定文件的存储位置。当我 运行 忙于 dask 时,它会将文件保存在我找不到的地方。我在 Jupyterlab 中 运行 编译我的代码,但在任何目录中都没有保存任何内容。

我花了一些时间来解析你的代码。

在 large 函数中,您使用 s3fs 与您的云存储进行交互,这与 xarray 配合得很好。

但是,在您的主要代码中,您使用 boto3 来列出和打开 S3 文件。这些文件保留对客户端对象的引用,该对象维护一个连接池。那就是不能腌制的东西。

s3fs 旨在与 Dask 一起使用,并确保文件系统实例和 OpenFile 对象的可选择性。由于您已经在其中一部分中使用了它,因此我建议始终使用 s3fs(但我当然有偏见,因为我是主要作者)。

或者,您可以只传递文件名(作为字符串),直到在 worker 函数中才打开任何内容。这将是“最佳实践”——您应该在工作任务中加载数据,而不是在客户端加载并传递数据。