在 Pyarrows 的 HdfsClient 中使用多处理
Using multiprocessing with Pyarrows' HdfsClient
我有一个顶层函数,它获取一个包含镶木地板文件路径和列名的元组。
该函数仅加载文件中的列,转换为 pandas,然后将其打包/序列化为标准格式。类似于:
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool
def binarizer(file_data_tuple):
''' Read a Parquet column a file, binarize and return'''
path, col_name, col_meta, native = file_data_tuple
if not native:
# Either this or using a top level hdfs_con
hdfs_con = pa.hdfs.connect(params)
read_pq = pq.read_table if native else hdfs_con.read_parquet
arrow_col = read_pq(filepath, columns = (col_name,))
bin_col = imported_binarizng_function(arrow_col)
return bin_col
def read_binarize_parallel(filepaths):
''' Setup parallel reading and binarizing of a parquet file'''
# list of tuples containing the filepath, column name, meta, and mode
pool_params = [(),..]
pool = Pool()
for file in filepaths:
bin_cols = pool.map(binarizer, pool_params)
chunk = b''.join(bin_cols)
send_over_socket(chunk)
这在我使用本机模式时有效,也就是从本地文件系统读取文件。
但是,如果我尝试读取 hdfs,我会得到奇怪的(对我来说)箭头错误,无论是当我在每个进程中打开一个连接还是当我尝试使用同一个连接时。这是错误的压缩版本:
[libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse
message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is
missing required fields: callId, status [libprotobuf ERROR
google/protobuf/message_lite.cc:123] Can't parse message of type
"Hdfs.Internal.RpcResponseHeaderProto" because it is missing required
fields: callId, status [libprotobuf ERROR
google/protobuf/message_lite.cc:123] Can't parse message of type
"Hdfs.Internal.RpcResponseHeaderProto" because it is missing required
fields: callId, status [libprotobuf ERROR
google/protobuf/message_lite.cc:123] Can't parse message of type
"Hdfs.Internal.RpcResponseHeaderProto" because it is missing required
fields: callId, status 2018-01-09 21:41:47.939006, p10007,
th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on
server "192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpcException:
RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel
cannot find pending call: id = 3.
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*)
@ Unknown
@ Unknown
@ Unknown
2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Retry
idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.939357, p10010, th139965275871040, ERROR Failed to
invoke RPC call "getFileInfo" on server "192.168.0.101:9000":
RpcChannel.cpp: 780: HdfsRpcException: RPC channel to
"192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse
response header.
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*)
@ Unknown
@ Unknown
@ Unknown
@2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC
channel to "192.168.0.101:9000" got protocol mismatch: RPC channel
cannot parse response header.
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*)
@ Unknown
@ Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC
channel to "192.168.0.101:9000" got protocol mismatch: RPC channel
cannot parse response header.
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*)
@ Unknown
@ Unknown
@2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC
channel to "192.168.0.101:9000" got protocol mismatch: RPC channel
cannot parse response header.
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*)
@ Unknown
@ Unknown
@ @ Unknown
Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC
channel to "192.168.0.101:9000" got protocol mismatch: RPC channel
cannot parse response header.
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*)
@ Unknown
@ Unknown
@ Unknown
@ Unknown
Unknown
@ Unknown binarizing process filepath: /parquet_430mb/5e6.parquet
@ Unknown
Unknown
@ Unknown
@ Unknown
@ Unknown
2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Retry
idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO Retry
idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.939866, p10008, th139965275871040, INFO Retry
idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.939868, p10012, th139965275871040, INFO Retry
idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.939868, p10009, th139965275871040, INFO Retry
idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.940813, p10014, th139965275871040, ERROR Failed to
invoke RPC call "getFileInfo" on server "192.168.0.101:9000":
RpcChannel.cpp: 780: HdfsRpcException: RPC channel to
"192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse
response header.
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*)
@ Unknown
@ Unknown
2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Retry
idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.944352, p10011, th139965275871040, ERROR Failed to
invoke RPC call "getFileInfo" on server "192.168.0.101:9000":
RpcChannel.cpp: 393: HdfsRpcException: Failed to invoke RPC call
"getFileInfo" on server "192.168.0.101:9000"
@ Unknown
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*)
@ Unknown
@ Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken
pipe
@ Unknown
@ Unknown
@ Unknown
@ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*)
@ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*)
@ Unknown
@ Unknown
@ Unknown
2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Retry
idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
--------------------------------------------------------------------------- ArrowIOError Traceback (most recent call
last)
/home/parquet_sender.pyc in insert_files_parallel(self)
374 # print ('372 sqparquet filepath:', filepath)
375 params_with_path_and_mode = [col_params+(filepath, native) for col_params in pool_params]
--> 376 bin_col = self.pool.map(read_binarize, params_with_path_and_mode)
377 got ('map complete')
378 num_rows = bin_col[0][2]
/usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func,
iterable, chunksize)
249 '''
250 assert self._state == RUN
--> 251 return self.map_async(func, iterable, chunksize).get()
252
253 def imap(self, func, iterable, chunksize=1):
/usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout)
556 return self._value
557 else:
--> 558 raise self._value
559
560 def _set(self, i, obj):
ArrowIOError: HDFS: GetPathInfo failed
我很高兴收到有关此错误原因的任何反馈,以及我应该如何使用并行镶木地板加载。
这是一个与多处理序列化细节相关的错误。我在这里打开了一个错误报告 https://issues.apache.org/jira/browse/ARROW-1986
我有一个顶层函数,它获取一个包含镶木地板文件路径和列名的元组。
该函数仅加载文件中的列,转换为 pandas,然后将其打包/序列化为标准格式。类似于:
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool
def binarizer(file_data_tuple):
''' Read a Parquet column a file, binarize and return'''
path, col_name, col_meta, native = file_data_tuple
if not native:
# Either this or using a top level hdfs_con
hdfs_con = pa.hdfs.connect(params)
read_pq = pq.read_table if native else hdfs_con.read_parquet
arrow_col = read_pq(filepath, columns = (col_name,))
bin_col = imported_binarizng_function(arrow_col)
return bin_col
def read_binarize_parallel(filepaths):
''' Setup parallel reading and binarizing of a parquet file'''
# list of tuples containing the filepath, column name, meta, and mode
pool_params = [(),..]
pool = Pool()
for file in filepaths:
bin_cols = pool.map(binarizer, pool_params)
chunk = b''.join(bin_cols)
send_over_socket(chunk)
这在我使用本机模式时有效,也就是从本地文件系统读取文件。
但是,如果我尝试读取 hdfs,我会得到奇怪的(对我来说)箭头错误,无论是当我在每个进程中打开一个连接还是当我尝试使用同一个连接时。这是错误的压缩版本:
[libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: callId, status [libprotobuf ERROR google/protobuf/message_lite.cc:123] Can't parse message of type "Hdfs.Internal.RpcResponseHeaderProto" because it is missing required fields: callId, status 2018-01-09 21:41:47.939006, p10007, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot find pending call: id = 3. @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ Unknown @ Unknown
@ Unknown
2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939357, p10010, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown @ Unknown
@ Unknown @2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown @2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown @ @ Unknown Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown @ Unknown
@ Unknown @ Unknown Unknown @ Unknown binarizing process filepath: /parquet_430mb/5e6.parquet @ Unknown Unknown @ Unknown @ Unknown @ Unknown
2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000"
2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939866, p10008, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10012, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.939868, p10009, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.940813, p10014, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 780: HdfsRpcException: RPC channel to "192.168.0.101:9000" got protocol mismatch: RPC channel cannot parse response header. @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown
2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" 2018-01-09 21:41:47.944352, p10011, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000": RpcChannel.cpp: 393: HdfsRpcException: Failed to invoke RPC call "getFileInfo" on server "192.168.0.101:9000" @ Unknown @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown
@ Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken
pipe @ Unknown @ Unknown
@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ Unknown @ Unknown
@ Unknown
2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO Retry idempotent RPC call "getFileInfo" on server "192.168.0.101:9000" --------------------------------------------------------------------------- ArrowIOError Traceback (most recent call last)
/home/parquet_sender.pyc in insert_files_parallel(self) 374 # print ('372 sqparquet filepath:', filepath) 375 params_with_path_and_mode = [col_params+(filepath, native) for col_params in pool_params] --> 376 bin_col = self.pool.map(read_binarize, params_with_path_and_mode) 377 got ('map complete') 378 num_rows = bin_col[0][2]
/usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, iterable, chunksize) 249 ''' 250 assert self._state == RUN --> 251 return self.map_async(func, iterable, chunksize).get() 252 253 def imap(self, func, iterable, chunksize=1):
/usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 return self._value 557 else: --> 558 raise self._value 559 560 def _set(self, i, obj):
ArrowIOError: HDFS: GetPathInfo failed
我很高兴收到有关此错误原因的任何反馈,以及我应该如何使用并行镶木地板加载。
这是一个与多处理序列化细节相关的错误。我在这里打开了一个错误报告 https://issues.apache.org/jira/browse/ARROW-1986