在 pandas 或 dask 中从数据库 table 读取大数据
Read Large data from database table in pandas or dask
我想从一个 table 中读取所有 10+ gb 数据的数据到数据帧中。当我尝试使用 read_sql
读取时,出现内存过载错误。我想对该数据进行一些处理并使用新数据更新 table。我怎样才能有效地做到这一点。我的电脑有 26gb 的内存,但数据最大为 11gb,但我仍然遇到内存过载错误。
在 Dask 中花费了很多时间。下面是代码。
import dateparser
import dask.dataframe as dd
import numpy as np
df = dd.read_sql_table('fbo_xml_json_raw_data', index_col='id', uri='postgresql://postgres:passwordk@address:5432/database')
def make_year(data):
if data and data.isdigit() and int(data) >= 0:
data = '20' + data
elif data and data.isdigit() and int(data) < 0:
data = '19' + data
return data
def response_date(data):
if data and data.isdigit() and int(data[-2:]) >= 0:
data = data[:-2] + '20' + data[-2:]
elif data and data.isdigit() and int(data[-2:]) < 0:
data = data[:-2] + '19' + data[-2:]
if data and dateparser.parse(data):
return dateparser.parse(data).date().strftime('%Y-%m-%d')
def parse_date(data):
if data and dateparser.parse(data):
return dateparser.parse(data).date().strftime('%Y-%m-%d')
df.ARCHDATE = df.ARCHDATE.apply(parse_date)
df.YEAR = df.YEAR.apply(make_year)
df.DATE = df.DATE + df.YEAR
df.DATE = df.DATE.apply(parse_date)
df.RESPDATE = df.RESPDATE.apply(response_date)
主要问题似乎是 pd.Series.apply
的独占使用。但是 apply
只是一个 行 Python 级循环 。在 Pandas 和 Dask 中会比较慢。对于性能关键代码,您应该支持按列操作。
其实dask.dataframe
支持useful subset的Pandas API。这里有几个例子:-
避免字符串操作
先将数据转换为数值类型;然后执行向量化操作。例如:
dd['YEAR'] = dd['YEAR'].astype(int)
dd['YEAR'] = dd['YEAR'].mask(dd['YEAR'] >= 0, 20)
dd['YEAR'] = dd['YEAR'].mask(dd['YEAR'] < 0, 19)
转换为日期时间
如果您有 datetime
个格式合适的字符串:
df['ARCHDATE'] = df['ARCHDATE'].astype('M8[us]')
另见 。
看这里:https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql.html
看到那个 chunksize
参数了吗?您可以将数据分块以适合内存。
它将 return 一个块读取对象,因此您可以在块上迭代地应用操作。
您也可以合并 multiprocessing
。
这将增加一层复杂性,因为您不再处理 DataFrame 本身,而是处理包含块的对象。
由于您使用的是 Dask
,因此 "should" 适用。我不确定 Dask 如何处理分块。我已经有一段时间没有接触 Pandas/Dask 兼容性了。
我想从一个 table 中读取所有 10+ gb 数据的数据到数据帧中。当我尝试使用 read_sql
读取时,出现内存过载错误。我想对该数据进行一些处理并使用新数据更新 table。我怎样才能有效地做到这一点。我的电脑有 26gb 的内存,但数据最大为 11gb,但我仍然遇到内存过载错误。
在 Dask 中花费了很多时间。下面是代码。
import dateparser
import dask.dataframe as dd
import numpy as np
df = dd.read_sql_table('fbo_xml_json_raw_data', index_col='id', uri='postgresql://postgres:passwordk@address:5432/database')
def make_year(data):
if data and data.isdigit() and int(data) >= 0:
data = '20' + data
elif data and data.isdigit() and int(data) < 0:
data = '19' + data
return data
def response_date(data):
if data and data.isdigit() and int(data[-2:]) >= 0:
data = data[:-2] + '20' + data[-2:]
elif data and data.isdigit() and int(data[-2:]) < 0:
data = data[:-2] + '19' + data[-2:]
if data and dateparser.parse(data):
return dateparser.parse(data).date().strftime('%Y-%m-%d')
def parse_date(data):
if data and dateparser.parse(data):
return dateparser.parse(data).date().strftime('%Y-%m-%d')
df.ARCHDATE = df.ARCHDATE.apply(parse_date)
df.YEAR = df.YEAR.apply(make_year)
df.DATE = df.DATE + df.YEAR
df.DATE = df.DATE.apply(parse_date)
df.RESPDATE = df.RESPDATE.apply(response_date)
主要问题似乎是 pd.Series.apply
的独占使用。但是 apply
只是一个 行 Python 级循环 。在 Pandas 和 Dask 中会比较慢。对于性能关键代码,您应该支持按列操作。
其实dask.dataframe
支持useful subset的Pandas API。这里有几个例子:-
避免字符串操作
先将数据转换为数值类型;然后执行向量化操作。例如:
dd['YEAR'] = dd['YEAR'].astype(int)
dd['YEAR'] = dd['YEAR'].mask(dd['YEAR'] >= 0, 20)
dd['YEAR'] = dd['YEAR'].mask(dd['YEAR'] < 0, 19)
转换为日期时间
如果您有 datetime
个格式合适的字符串:
df['ARCHDATE'] = df['ARCHDATE'].astype('M8[us]')
另见
看这里:https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql.html
看到那个 chunksize
参数了吗?您可以将数据分块以适合内存。
它将 return 一个块读取对象,因此您可以在块上迭代地应用操作。
您也可以合并 multiprocessing
。
这将增加一层复杂性,因为您不再处理 DataFrame 本身,而是处理包含块的对象。
由于您使用的是 Dask
,因此 "should" 适用。我不确定 Dask 如何处理分块。我已经有一段时间没有接触 Pandas/Dask 兼容性了。