read_sql_table 后无法保留 dask 数据帧

unable to persist dask dataframe after read_sql_table

我正在尝试将数据库 table 读入 dask 数据帧,然后保留该数据帧。我尝试了一些变体,它们要么导致内存不足,要么导致错误。

我在 Windows 10 笔记本电脑上工作,内存为 8 GB。当我尝试读取大型 MySQL 或 Oracle 数据库 table 时,问题就开始了。我可以用 SQLite 重现这个问题。

这是设置 700 MB SQLite table 以重现问题的代码。 (请原谅 python 代码中的任何笨拙——我已经做了 10 年的 SAS 数据分析师。我正在寻找更便宜的替代品,所以我是 python、numpy、pandas,和 dask。请注意,SAS 可以读取 SQLite table,将其写入磁盘,并在 90 秒内创建索引,而无需锁定笔记本电脑。)

import numpy as np
from sqlalchemy import create_engine, MetaData, Table, Column, Integer
import sqlite3

# function to create a list of dicts with chunkSize rows by 3columns
# except for the first column, the columns are filled with random integers

def getChunk(chunkSize, prevCount):
    x = np.random.randint(low=0, high=2**32, size=(chunkSize,3), dtype='int64')
    y = x.ravel().view(dtype=[('a', 'i8'), ('b', 'i8'), ('c', 'i8')])
    y['a'] = [k + prevCount for k in range(chunkSize)]
    names = y.dtype.names
    listOfDicts = [dict(zip(names, row)) for row in y] 
    return listOfDicts

# set up a SQLAlchemy engine to a sqlite DB

dbPath = "C:\temp2\test.db"
connString = "sqlite:///{}".format(dbPath)
engine = create_engine(connString)

# create a table with 3 Integer columns

metadata = MetaData()
testTable = Table('testTbl', metadata,
                  Column('a', Integer, primary_key='True'),
                  Column('b', Integer),
                  Column('c', Integer)
                 )

metadata.create_all(engine)
conn = engine.connect()

chunkSize = 25000
numberChunks = 1400

sqlite3.register_adapter(np.int64, lambda x: int(x))

# use the SQLAlchemy table insert method to load list of dicts into the table, one chunk at a time
prevCount = 0

with conn.begin():
    for i in range(0, numberChunks) :
        listOfDicts = getChunk(chunkSize, prevCount)
        conn.execute(testTable.insert(), listOfDicts)
        prevCount = prevCount + chunkSize

conn.close()

我在 dask 调度程序上尝试了 4 种变体:

  1. 默认调度程序 -- 这是 OOM,笔记本电脑被锁定。

  2. 具有多个进程的本地分布式调度程序 -- 这给出了龙卷风异常

  3. 具有一个进程的本地分布式调度程序 -- 这是 OOM。

  4. 从命令行启动 dask-scheduler 和 dask-worker,将 worker 内存限制为 3 GB。此变体导致错误,工人被杀死。

每个变体的代码如下。我怎样才能使这项工作?

1.

# default scheduler -- OOM
import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest

cache = chest.Chest(path='c:\temp2', available_memory=8e9)
dask.set_options(cache=cache)
dbPath = "C:\temp2\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = df.persist() 
  1. 本地分布式调度程序

    import dask.dataframe as ddf
    from dask.distributed import Client
    import dask
    import chest
    cache = chest.Chest(path='c:\temp2', available_memory=8e9)
    dask.set_options(cache=cache)
    client = Client()  
    dbPath = "C:\temp2\test.db"
    connString = "sqlite:///{}".format(dbPath)
    df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
    df = client.persist(df)
    

异常是这样开始的:

>>> tornado.application - ERROR - Exception in callback <bound method Nanny.memory_monitor of <Nanny: tcp://127.0.0.1:57522, threads: 1>>
Traceback (most recent call last):
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 635, in wrapper
    return fun(self, *args, **kwargs)
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 821, in create_time
    return cext.proc_create_time(self.pid)
ProcessLookupError: [Errno 3] No such process

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Program Files\Python36\lib\site-packages\psutil\__init__.py", line 368, in _init
    self.create_time()
  File "C:\Program Files\Python36\lib\site-packages\psutil\__init__.py", line 699, in create_time
    self._create_time = self._proc.create_time()
  File "C:\Program Files\Python36\lib\site-packages\psutil\_pswindows.py", line 640, in wrapper
    raise NoSuchProcess(self.pid, self._name)
psutil._exceptions.NoSuchProcess: psutil.NoSuchProcess process no longer exists (pid=14212)
  1. 一个进程 -- OOM

    import dask.dataframe as ddf
    from dask.distributed import Client
    import dask
    import chest
    cache = chest.Chest(path='c:\temp2', available_memory=8e9)
    dask.set_options(cache=cache, get=dask.get)
    client = Client(processes=False)
    dbPath = "C:\temp2\test.db"
    connString = "sqlite:///{}".format(dbPath)
    df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
    df = client.persist(df, get=dask.get)
    
  2. dask-scheduler, dask-worker

一个命令行: c:>dask-scheduler --host 127.0.0.1

另一个命令行: c:>dask-worker 127.0.0.1:8786 --nprocs 1 --nthreads 1 --name worker-1 --memory-limit 3GB --local-directory c:\temp2

import dask.dataframe as ddf
from dask.distributed import Client
import dask
import chest
cache = chest.Chest(path='c:\temp2', available_memory=8e9)
dask.set_options(cache=cache)
client = Client(address="127.0.0.1:8786")
dbPath = "C:\temp2\test.db"
connString = "sqlite:///{}".format(dbPath)
df = ddf.read_sql_table('testTbl', connString, index_col = 'a')
df = client.persist(df)

工人一次又一次地被这些消息杀死:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.12 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.16 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.24 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.31 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.39 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Worker is at 81% memory usage. Pausing worker.  Process memory: 2.46 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.47 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.54 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.61 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.66 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.73 GB -- Worker memory limit: 3.00 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 2.81 GB -- Worker memory limit: 3.00 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker process 17916 was killed by signal 15
distributed.nanny - WARNING - Restarting worker

我不相信你在列 'a' 上有索引,这意味着在扫描 table 时每个分区访问可能在 sqlite 中使用大量内存。无论如何,pandas' 通过 sqlalchemy 访问数据库并不是特别节省内存,所以我对您在访问期间出现内存峰值并不感到非常惊讶。

但是,您可以增加分区数以便能够访问数据。例如:

df = ddf.read_sql_table('testTbl', connString, index_col = 'a', npartitions=20)

或者减少可用的 threads/processes 数量,以便每个线程有更多内存。

请注意,chest 在这里对您没有任何帮助,它只能保存已完成的结果,并且在加载数据期间会发生内存峰值(此外,分布式工作人员应该在没有的情况下溢出到磁盘)显式提供缓存)。