Python: IOError 110 从磁盘读取时连接超时

Python: IOError 110 Connection timed out when reading from disk

我是 运行 Sun Grid Engine 超级计算集群上的一个 Python 脚本,它读取文件 ID 列表,将每个文件发送到工作进程进行分析,并为每个文件写入一个输出输入文件到磁盘。

问题是我在 worker 函数中的某处遇到了 IOError(110, 'Connection timed out'),我不确定为什么。我过去在发出严重延迟的网络请求时收到过这个错误,但在这种情况下,工作人员只是试图从磁盘读取数据。

我的问题是:什么会导致从磁盘读取时出现连接超时错误,如何解决这个错误?其他人可以提供的任何帮助将不胜感激。

完整脚本(minhash_text() 中出现 IOError):

from datasketch import MinHash
from multiprocessing import Pool
from collections import defaultdict
from nltk import ngrams
import json
import sys
import codecs
import config

cores = 24
window_len = 12
step = 4
worker_files = 50
permutations = 256
hashband_len = 4

def minhash_text(args):
  '''Return a list of hashband strings for an input doc'''
  try:
    file_id, path = args
    with codecs.open(path, 'r', 'utf8') as f:
      f = f.read()
    all_hashbands = []
    for window_idx, window in enumerate(ngrams(f.split(), window_len)):
      window_hashbands = []
      if window_idx % step != 0:
        continue
      minhash = MinHash(num_perm=permutations, seed=1)
      for ngram in set(ngrams(' '.join(window), 3)):
        minhash.update( ''.join(ngram).encode('utf8') )
      hashband_vals = []
      for i in minhash.hashvalues:
        hashband_vals.append(i)
        if len(hashband_vals) == hashband_len:
          window_hashbands.append( '.'.join([str(j) for j in hashband_vals]) )
          hashband_vals = []
      all_hashbands.append(window_hashbands)
    return {'file_id': file_id, 'hashbands': all_hashbands}
  except Exception as exc:
    print(' ! error occurred while processing', file_id, exc)
    return {'file_id': file_id, 'hashbands': []}

if __name__ == '__main__':

  file_ids = json.load(open('file_ids.json'))
  file_id_path_tuples = [(file_id, path) for file_id, path in file_ids.items()]

  worker_id = int(sys.argv[1])
  worker_ids = list(ngrams(file_id_path_tuples, worker_files))[worker_id]

  hashband_to_ids = defaultdict(list)
  pool = Pool(cores)

  for idx, result in enumerate(pool.imap(minhash_text, worker_ids)):
    print(' * processed', idx, 'results')
    file_id = result['file_id']
    hashbands = result['hashbands']
    for window_idx, window_hashbands in enumerate(hashbands):
      for hashband in window_hashbands:
        hashband_to_ids[hashband].append(file_id + '.' + str(window_idx))

  with open(config.out_dir + 'minhashes-' + str(worker_id) + '.json', 'w') as out:
    json.dump(dict(hashband_to_ids), out)

事实证明我对文件系统的打击太大了,对同一台服务器上的文件发出了太多的并发读取请求。该服务器在给定时间段内只能允许固定数量的读取,因此超过该限制的任何请求都会收到连接超时响应。

解决方案是将每个文件读取请求包装在一个 while 循环中。在 while 循环中,尝试从磁盘读取适当的文件。如果出现 Connection timed out 错误,请休眠一秒钟,然后重试。只有在文件被读取后,while 循环才会被打破。