需要跳过包含 "Value Error" 的行
Need to skip line containing "Value Error"
我正在尝试从 Teradata 服务器中提取一些遗留数据,但某些记录包含未在 python 中注册的奇怪字符,例如 "U+ffffffc2".
目前,
我正在使用 pyodbc 从 Teradata 中提取数据
将结果放入一个 numpy 数组(因为当我将它直接放入 pandas 时,它将所有列解释为字符串类型的单个列)
然后我将 numpy 数组转换为 pandas 数据框,将 Decimal("09809") 和 Date("2015,11,14") 等内容更改为 [09809," 11,14,2015"]
然后我尝试把它写到一个文件中,出现了这个错误
ValueError: 字符 U+ffffffc2 不在范围 [U+0000; U+10ffff]
我无权编辑此数据,所以从客户的角度来看,我可以做些什么来跳过或最好是在写入字符之前删除字符,然后尝试将其写入文件并出现错误?
目前,我有一个 "try and except" 块来跳过包含错误数据的查询,但我必须查询至少 100 行块中的数据。所以如果我只是跳过它,我会损失 100 或更多一次行。然而,正如我之前提到的,我更愿意保留该行,但删除该字符。
这是我的代码。 (也请随时指出任何不好的做法!)
#Python 3.4
#Python Teradata Extraction
#Created 01/28/16 by Maz Baig
#dependencies
import pyodbc
import numpy as np
import pandas as pd
import sys
import os
import psutil
from datetime import datetime
#create a global variable for start time
start_time=datetime.now()
#create global process variable to keep track of memory usage
process=psutil.Process(os.getpid())
def ResultIter(curs, arraysize):
#Get the specified number of rows at a time
while True:
results = curs.fetchmany(arraysize)
if not results:
break
#for result in results:
yield results
def WriteResult(curs,file_path,full_count):
rate=100
rows_extracted=0
for result in ResultIter(curs,rate):
table_matrix=np.array(result)
#Get shape to make sure its not a 1d matrix
rows, length = table_matrix.shape
#if it is a 1D matrix, add a row of nothing to make sure pandas doesn't throw an error
if rows < 2:
dummyrow=np.zeros((1,length))
dummyrow[:]=None
df = pd.DataFrame(table_matrix)
#give the user a status update
rows_extracted=rows+rows_extracted
StatusUpdate(rows_extracted,full_count)
with open(file_path,'a') as f:
try:
df.to_csv(file_path,sep='\u0001',encoding='latin-1',header=False,index=False)
except ValueError:
#pass afterwards
print("This record was giving you issues")
print(table_matrix)
pass
print('\n')
if (rows_extracted < full_count):
print("All of the records were not extracted")
#print the run durration
print("Duration: "+str(datetime.now() - start_time))
sys.exit(3)
f.close()
def StatusUpdate(rows_ex,full_count):
print(" ::Rows Extracted:"+str(rows_ex)+" of "+str(full_count)+" | Memory Usage: "+str(process.memory_info().rss/78
def main(args):
#get Username and Password
usr = args[1]
pwd = args[2]
#Define Table
view_name=args[3]
table_name=args[4]
run_date=args[5]
#get the select statement as an input
select_statement=args[6]
if select_statement=='':
select_statement='*'
#create the output filename from tablename and run date
file_name=run_date + "_" + table_name +"_hist.dat"
file_path="/prod/data/cohl/rfnry/cohl_mort_loan_perfnc/temp/"+file_name
if ( not os.path.exists(file_path)):
#create connection
print("Logging In")
con_str = 'DRIVER={Teradata};DBCNAME=oneview;UID='+usr+';PWD='+pwd+';QUIETMODE=YES;'
conn = pyodbc.connect(con_str)
print("Logged In")
#Get number of records in the file
count_query = 'select count (*) from '+view_name+'.'+table_name
count_curs = conn.cursor()
count_curs.execute(count_query)
full_count = count_curs.fetchone()[0]
#Generate query to retrieve all of the table data
query = 'select '+select_statement+' from '+view_name+'.'+table_name
#create cursor
curs = conn.cursor()
#execute query
curs.execute(query)
#save contents of the query into a matrix
print("Writting Result Into File Now")
WriteResult(curs,file_path,full_count)
print("Table: "+table_name+" was successfully extracted")
#print the scripts run duration
print("Duration: "+str(datetime.now() - start_time))
sys.exit(0)
else:
print("AlreadyThere Exception\nThe file already exists at "+file_path+". Please remove it before continuing\n")
#print the scripts run duration
print("Duration: "+str(datetime.now() - start_time))
sys.exit(2)
main(sys.argv)
谢谢,
马兹
如果您只有 4 字节的 unicode 点给出错误,这可能会有所帮助。
一种解决方案是使用 codecs.register_error 注册自定义错误处理程序,这将过滤掉错误点,然后尝试解码:
import codecs
def error_handler(error):
return '', error.end+6
codecs.register_error('nonunicode', error_handler)
b'abc\xffffffc2def'.decode(errors='nonunicode')
# gives you 'abcdef' which's exactly what you want
您可以进一步改进处理程序以捕获更复杂的错误,有关详细信息,请参阅https://docs.python.org/3/library/exceptions.html#UnicodeError and https://docs.python.org/3/library/codecs.html#codecs.register_error
我正在尝试从 Teradata 服务器中提取一些遗留数据,但某些记录包含未在 python 中注册的奇怪字符,例如 "U+ffffffc2".
目前,
我正在使用 pyodbc 从 Teradata 中提取数据
将结果放入一个 numpy 数组(因为当我将它直接放入 pandas 时,它将所有列解释为字符串类型的单个列)
然后我将 numpy 数组转换为 pandas 数据框,将 Decimal("09809") 和 Date("2015,11,14") 等内容更改为 [09809," 11,14,2015"]
然后我尝试把它写到一个文件中,出现了这个错误
ValueError: 字符 U+ffffffc2 不在范围 [U+0000; U+10ffff]
我无权编辑此数据,所以从客户的角度来看,我可以做些什么来跳过或最好是在写入字符之前删除字符,然后尝试将其写入文件并出现错误?
目前,我有一个 "try and except" 块来跳过包含错误数据的查询,但我必须查询至少 100 行块中的数据。所以如果我只是跳过它,我会损失 100 或更多一次行。然而,正如我之前提到的,我更愿意保留该行,但删除该字符。
这是我的代码。 (也请随时指出任何不好的做法!)
#Python 3.4
#Python Teradata Extraction
#Created 01/28/16 by Maz Baig
#dependencies
import pyodbc
import numpy as np
import pandas as pd
import sys
import os
import psutil
from datetime import datetime
#create a global variable for start time
start_time=datetime.now()
#create global process variable to keep track of memory usage
process=psutil.Process(os.getpid())
def ResultIter(curs, arraysize):
#Get the specified number of rows at a time
while True:
results = curs.fetchmany(arraysize)
if not results:
break
#for result in results:
yield results
def WriteResult(curs,file_path,full_count):
rate=100
rows_extracted=0
for result in ResultIter(curs,rate):
table_matrix=np.array(result)
#Get shape to make sure its not a 1d matrix
rows, length = table_matrix.shape
#if it is a 1D matrix, add a row of nothing to make sure pandas doesn't throw an error
if rows < 2:
dummyrow=np.zeros((1,length))
dummyrow[:]=None
df = pd.DataFrame(table_matrix)
#give the user a status update
rows_extracted=rows+rows_extracted
StatusUpdate(rows_extracted,full_count)
with open(file_path,'a') as f:
try:
df.to_csv(file_path,sep='\u0001',encoding='latin-1',header=False,index=False)
except ValueError:
#pass afterwards
print("This record was giving you issues")
print(table_matrix)
pass
print('\n')
if (rows_extracted < full_count):
print("All of the records were not extracted")
#print the run durration
print("Duration: "+str(datetime.now() - start_time))
sys.exit(3)
f.close()
def StatusUpdate(rows_ex,full_count):
print(" ::Rows Extracted:"+str(rows_ex)+" of "+str(full_count)+" | Memory Usage: "+str(process.memory_info().rss/78
def main(args):
#get Username and Password
usr = args[1]
pwd = args[2]
#Define Table
view_name=args[3]
table_name=args[4]
run_date=args[5]
#get the select statement as an input
select_statement=args[6]
if select_statement=='':
select_statement='*'
#create the output filename from tablename and run date
file_name=run_date + "_" + table_name +"_hist.dat"
file_path="/prod/data/cohl/rfnry/cohl_mort_loan_perfnc/temp/"+file_name
if ( not os.path.exists(file_path)):
#create connection
print("Logging In")
con_str = 'DRIVER={Teradata};DBCNAME=oneview;UID='+usr+';PWD='+pwd+';QUIETMODE=YES;'
conn = pyodbc.connect(con_str)
print("Logged In")
#Get number of records in the file
count_query = 'select count (*) from '+view_name+'.'+table_name
count_curs = conn.cursor()
count_curs.execute(count_query)
full_count = count_curs.fetchone()[0]
#Generate query to retrieve all of the table data
query = 'select '+select_statement+' from '+view_name+'.'+table_name
#create cursor
curs = conn.cursor()
#execute query
curs.execute(query)
#save contents of the query into a matrix
print("Writting Result Into File Now")
WriteResult(curs,file_path,full_count)
print("Table: "+table_name+" was successfully extracted")
#print the scripts run duration
print("Duration: "+str(datetime.now() - start_time))
sys.exit(0)
else:
print("AlreadyThere Exception\nThe file already exists at "+file_path+". Please remove it before continuing\n")
#print the scripts run duration
print("Duration: "+str(datetime.now() - start_time))
sys.exit(2)
main(sys.argv)
谢谢,
马兹
如果您只有 4 字节的 unicode 点给出错误,这可能会有所帮助。 一种解决方案是使用 codecs.register_error 注册自定义错误处理程序,这将过滤掉错误点,然后尝试解码:
import codecs
def error_handler(error):
return '', error.end+6
codecs.register_error('nonunicode', error_handler)
b'abc\xffffffc2def'.decode(errors='nonunicode')
# gives you 'abcdef' which's exactly what you want
您可以进一步改进处理程序以捕获更复杂的错误,有关详细信息,请参阅https://docs.python.org/3/library/exceptions.html#UnicodeError and https://docs.python.org/3/library/codecs.html#codecs.register_error