HBase-HappyBase:较大文件的套接字超时错误 - 适用于较小的文件

HBase-HappyBase : Socket Timeout Error For Larger Files - Works Good With Smaller one's

我使用下面的一段 python 代码使用 happybase 模块来更新 hbase。这对于少于 30k 记录的文件非常有效。但超过 30k-35k 时会抛出超时错误。我通过编辑 hbase_site.xml 和其他一些东西尝试了在其他堆栈问题中告知的选项。但没有帮助。有人遇到过同样的问题吗?

import happybase as hb

def loadIdPHSegmentPyBase() :
s = socket.socket()
s.settimeout(300)
connection = hb.Connection('XXXXX',9090,timeout=None,compat='0.92',transport='buffered')
table = connection.table('HBASE_D_L')
ReqFileToLoad = ("%segment.txt" %(dirName))
b = table.batch()
with open('%s' %(ReqFileToLoad)) as ffile1 :
     for line in ffile1 :
         line = line.strip()
         line = line.split('|')
         #print line[7] ,
         if line[7] == 'PH' :
           b.put(line[0],{'ADDR_IDPH:PHMIDDLE_NAME':line[1],'ADDR_IDPH:PHSUR_NAME' :line[2],'ADDR_IDPH:PHFIRST_NAME' :line[3],'ADDR_IDPH:PHFILLER1' :line[4],'ADDR_IDPH:PHFILLER2' :line[5],'ADDR_IDPH:PHFILLER3' :line[6],'ADDR_IDPH:TELEPHONE_SUBSEGMENT_ID' :line[7],'ADDR_IDPH:TELEPHONE_TYPE_CODE' :line[8],'ADDR_IDPH:PUBLISHED_INDICATOR' :line[9],'ADDR_IDPH:TELEPHONE_NUMBER' :line[10]})
         else :
           b.put(line[0],{'ADDR_IDPH:IDMIDDLE_NAME':line[1],'ADDR_IDPH:IDSUR_NAME' :line[2],'ADDR_IDPH:IDFIRST_NAME' :line[3],'ADDR_IDPH:IDFILLER1' :line[4],'ADDR_IDPH:IDFILLER2' :line[5],'ADDR_IDPH:IDFILLER3' :line[6],'ADDR_IDPH:IDSUBSEGMENT_IDENTIFIER' :line[7],'ADDR_IDPH:ID_TYPE' :line[8],'ADDR_IDPH:ID_VALIDITY_INDICATOR' :line[9],'ADDR_IDPH:ID_VALUE' :line[11]})
b.send()
s.close()

我的大文件错误:

 File "thriftpy/protocol/cybin/cybin.pyx", line 429, in     cybin.TCyBinaryProtocol.read_message_begin (thriftpy/protocol/cybin/cybin.c:6325)
  File "thriftpy/protocol/cybin/cybin.pyx", line 60, in cybin.read_i32 (thriftpy/protocol/cybin/cybin.c:1546)
  File "thriftpy/transport/buffered/cybuffered.pyx", line 65, in thriftpy.transport.buffered.cybuffered.TCyBufferedTransport.c_read (thriftpy/transport/buffered/cybuffered.c:1881)
  File "thriftpy/transport/buffered/cybuffered.pyx", line 69, in thriftpy.transport.buffered.cybuffered.TCyBufferedTransport.read_trans (thriftpy/transport/buffered/cybuffered.c:1948)
  File "thriftpy/transport/cybase.pyx", line 61, in thriftpy.transport.cybase.TCyBuffer.read_trans (thriftpy/transport/cybase.c:1472)
  File "/usr/local/python27/lib/python2.7/site-packages/thriftpy/transport/socket.py", line 108, in read
    buff = self.sock.recv(sz)
socket.timeout: timed out

问题是这样解决的:

with open('%s' %(ReqFileToLoad)) as ffile1 :
     for line in ffile1 :
         line = line.strip()
         line = line.split('|')
         #print line[7] ,
         if line[7] == 'PH' :
           b = table.batch()
           b.put(line[0],{'ADDR_IDPH:PHMIDDLE_NAME':line[1],'ADDR_IDPH:PHSUR_NAME' :line[2],'ADDR_IDPH:PHFIRST_NAME' :line[3],'ADDR_IDPH:PHFILLER1' :line[4],'ADDR_IDPH:PHFILLER2' :line[5],'ADDR_IDPH:PHFILLER3' :line[6],'ADDR_IDPH:TELEPHONE_SUBSEGMENT_ID' :line[7],'ADDR_IDPH:TELEPHONE_TYPE_CODE' :line[8],'ADDR_IDPH:PUBLISHED_INDICATOR' :line[9],'ADDR_IDPH:TELEPHONE_NUMBER' :line[10]})
         else :
           b = table.batch()
           b.put(line[0],{'ADDR_IDPH:IDMIDDLE_NAME':line[1],'ADDR_IDPH:IDSUR_NAME' :line[2],'ADDR_IDPH:IDFIRST_NAME' :line[3],'ADDR_IDPH:IDFILLER1' :line[4],'ADDR_IDPH:IDFILLER2' :line[5],'ADDR_IDPH:IDFILLER3' :line[6],'ADDR_IDPH:IDSUBSEGMENT_IDENTIFIER' :line[7],'ADDR_IDPH:ID_TYPE' :line[8],'ADDR_IDPH:ID_VALIDITY_INDICATOR' :line[9],'ADDR_IDPH:ID_VALUE' :line[11]})
b.send()

我建议您使用较小的批量,或者根本不使用批量。批处理是一个没有任何限制的客户端缓冲区,因此在发送时会导致巨大的节俭请求。 happybase 还为此提供了一个帮助程序:您可以指定 batch_size 并且批次将被定期刷新。

https://happybase.readthedocs.io/en/latest/api.html#happybase.Table.batch