C 扩展中用于 IO 有界线程的 GIL (HDF5)

GIL for IO bounded thread in C extension (HDF5)

我有一个采样应用程序每秒获取 250,000 个样本,将它们缓冲在内存中并最终附加到 [=15] 提供的 HDFStore =].总的来说,这很棒。但是,我有一个线程 运行s 并不断清空数据采集设备 (DAQ),它需要定期 运行基础。大约一秒的偏差往往会破坏事物。以下是观察到的时间的极端情况。 Start 表示 DAQ 读取开始,Finish 表示读取结束,IO 表示 HDF 写入(DAQIO 都发生在单独的线程)。

Start        : 2016-04-07 12:28:22.241303
IO (1)       : 2016-04-07 12:28:22.241303
Finish       : 2016-04-07 12:28:46.573440 (0.16 Hz, 24331.26 ms)
IO Done (1)  : 2016-04-07 12:28:46.573440 (24332.39 ms)

如您所见,执行此写入需要 24 秒(典型写入约为 40 毫秒)。我写入的 HDD 没有负载,所以这个延迟不应该是由争用引起的(在 运行ning 时它的利用率约为 7%)。我在 HDFStore 写入时禁用了索引。我的应用程序 运行 有许多其他线程,所有这些线程都打印状态字符串,因此 IO 任务似乎阻塞了所有其他线程。我花了相当多的时间逐步检查代码以找出速度变慢的地方,它总是在 C 扩展提供的方法中,这导致了我的问题..

  1. 可以 Python(我使用的是 3.5)在 C 扩展中抢占执行吗? Concurrency: Are Python extensions written in C/C++ affected by the Global Interpreter Lock? 似乎表明除非扩展明确产生,否则它不会。
  2. Pandas' HDF5 C 代码是否实现了 I/O 的任何屈服?如果是这样,这是否意味着延迟是由于 CPU 有界任务造成的?我已禁用索引。
  3. 关于如何获得一致的计时有什么建议吗?我正在考虑将 HDF5 代码移到另一个进程中。不过,这只会在一定程度上有所帮助,因为无论如何我真的不能容忍大约 20 秒的写入,尤其是当它们不可预测时。

这是您可以运行查看问题的示例:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import time

def write_samples(store, samples, overwrite):
    frame = pd.DataFrame(samples, dtype='float64')

    if not overwrite:
        store.append("df", frame, format='table', index=False)
    else:
        store.put("df", frame, format='table', index=False)

def begin_io():
    store = pd.HDFStore("D:\slow\test" + str(random.randint(0,100)) + ".h5", mode='w', complevel=0)

    counter = 0
    while True:
        data = np.random.rand(50000, 1)
        start_time = timer()
        write_samples(store, data, counter == 0)
        end_time = timer()

        print("IO Done      : %s (%.2f ms, %d)" % (datetime.datetime.now(), (end_time - start_time) * 1000, counter))

        counter += 1

    store.close()

def dummy_thread():
    previous = timer()
    while True:
        now = timer()
        print("Dummy Thread  : %s (%d ms)" % (datetime.datetime.now(), (now - previous) * 1000))
        previous = now
        time.sleep(0.01)


if __name__ == '__main__':
    threading.Thread(target=dummy_thread).start()
    begin_io()

您将获得类似于以下内容的输出:

IO Done      : 2016-04-08 10:51:14.100479 (3.63 ms, 470)
Dummy Thread  : 2016-04-08 10:51:14.101484 (12 ms)
IO Done      : 2016-04-08 10:51:14.104475 (3.01 ms, 471)
Dummy Thread  : 2016-04-08 10:51:14.576640 (475 ms)
IO Done      : 2016-04-08 10:51:14.576640 (472.00 ms, 472)
Dummy Thread  : 2016-04-08 10:51:14.897756 (321 ms)
IO Done      : 2016-04-08 10:51:14.898782 (320.79 ms, 473)
IO Done      : 2016-04-08 10:51:14.901772 (3.29 ms, 474)
IO Done      : 2016-04-08 10:51:14.905773 (2.84 ms, 475)
IO Done      : 2016-04-08 10:51:14.908775 (2.96 ms, 476)
Dummy Thread  : 2016-04-08 10:51:14.909777 (11 ms)

答案是否定的,这些作者没有发布GIL。请参阅文档 here。我知道您实际上并没有尝试使用 多个 线程进行写入,但这应该会提示您。当写入碰巧真正防止多次写入时,会持有强锁。 PyTablesh5py 都将此作为 HDF5 标准的一部分。

你可以看看SWMR, though not directly supported by pandas. PyTables docs here and here点解决办法。这些通常涉及有一个单独的进程从队列中拉出数据并写入它。

在任何情况下,这通常都是一种更具可扩展性的模式。

感谢您提供工作代码。我对其进行了修改以获得一些见解,后来又创建了 使用多处理的修改版本。

修改线程版本

所有修改只是为了获得更多信息,没有概念上的变化。合而为一 文件 mthread.py 并逐部分注释。

照常导入:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import logging

write_samples 得到了一些记录:

def write_samples(store, samples, overwrite):
    wslog = logging.getLogger("write_samples")
    wslog.info("starting")
    frame = pd.DataFrame(samples, dtype='float64')

    if overwrite:
        store.put("df", frame, format='table', index=False)
    else:
        store.append("df", frame, format='table', index=False)
    wslog.info("finished")

begin_io 达到最大持续时间,超过该时间会导致警告日志条目:

def begin_io(maxduration=500):
    iolog = logging.getLogger("begin_io")
    iolog.info("starting")
    try:
        fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
        iolog.debug("opening store %s", fname)
        with pd.HDFStore(fname, mode='w', complevel=0) as store:
            iolog.debug("store %s open", fname)

            counter = 0
            while True:
                data = np.random.rand(50000, 1)
                start_time = timer()
                write_samples(store, data, counter == 0)
                end_time = timer()
                duration = (end_time - start_time) * 1000
                iolog.debug("IO Done      : %s (%.2f ms, %d)",
                            datetime.datetime.now(),
                            duration,
                            counter)
                if duration > maxduration:
                    iolog.warning("Long duration %s", duration)
                counter += 1
    except Exception:
        iolog.exception("oops")
    finally:
        iolog.info("finished")

dummy_thread 已修改为正确停止并发出警告,如果时间太长:

def dummy_thread(pill2kill, maxduration=500):
    dtlog = logging.getLogger("dummy_thread")
    dtlog.info("starting")
    try:
        previous = timer()
        while not pill2kill.wait(0.01):
            now = timer()
            duration = (now - previous) * 1000
            dtlog.info("Dummy Thread  : %s (%d ms)",
                       datetime.datetime.now(),
                       duration)
            if duration > maxduration:
                dtlog.warning("Long duration %s", duration)
            previous = now
        dtlog.debug("stopped looping.")
    except Exception:
        dtlog.exception("oops")
    finally:
        dtlog.info("finished")

最后我们称之为全部。随意修改日志级别,WARNING 显示过多的次数, INFODEBUG 告诉我们更多。

if __name__ == '__main__':
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
    logging.basicConfig(format=logformat,
                        level=logging.WARNING)

    pill2kill = threading.Event()
    t = threading.Thread(target=dummy_thread, args=(pill2kill, 500))
    t.start()
    try:
        begin_io(500)
    finally:
        pill2kill.set()
        t.join()

运行 我得到的结果如您所述:

2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133

从值可以清楚地看出,虽然 begin_io 非常繁忙且延迟(可能在数据期间 写入磁盘),dummy_thread 也延迟了几乎相同的时间。

多处理版本 - 运行良好

我在多个进程中将代码修改为运行,从那以后,它真的不阻塞了 dummy_thread.

2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744

多处理的代码在这里:

import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import multiprocessing
import time
import logging


def write_samples(store, samples, overwrite):
    wslog = logging.getLogger("write_samples")
    wslog.info("starting")
    frame = pd.DataFrame(samples, dtype='float64')

    if overwrite:
        store.put("df", frame, format='table', index=False)
    else:
        store.append("df", frame, format='table', index=False)
    wslog.info("finished")


def begin_io(pill2kill, maxduration=500):
    iolog = logging.getLogger("begin_io")
    iolog.info("starting")
    try:
        fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
        iolog.debug("opening store %s", fname)
        with pd.HDFStore(fname, mode='w', complevel=0) as store:
            iolog.debug("store %s open", fname)

            counter = 0
            while not pill2kill.wait(0):
                data = np.random.rand(50000, 1)
                start_time = timer()
                write_samples(store, data, counter == 0)
                end_time = timer()
                duration = (end_time - start_time) * 1000
                iolog.debug( "IO Done      : %s (%.2f ms, %d)",
                            datetime.datetime.now(),
                            duration,
                            counter)
                if duration > maxduration:
                    iolog.warning("Long duration %s", duration)
                counter += 1
    except Exception:
        iolog.exception("oops")
    finally:
        iolog.info("finished")


def dummy_thread(pill2kill, maxduration=500):
    dtlog = logging.getLogger("dummy_thread")
    dtlog.info("starting")
    try:
        previous = timer()
        while not pill2kill.wait(0.01):
            now = timer()
            duration = (now - previous) * 1000
            dtlog.info( "Dummy Thread  : %s (%d ms)",
                       datetime.datetime.now(),
                       duration)
            if duration > maxduration:
                dtlog.warning("Long duration %s", duration)
            previous = now
        dtlog.debug("stopped looping.")
    except Exception:
        dtlog.exception("oops")
    finally:
        dtlog.info("finished")


if __name__ == '__main__':
    logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
    logging.basicConfig(format=logformat,
                        level=logging.WARNING)
    pill2kill = multiprocessing.Event()
    dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,))
    dp.start()
    try:
        p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,))
        p.start()
        time.sleep(100)
    finally:
        pill2kill.set()
        dp.join()
        p.join()

结论

将数据写入 HDF5 文件确实会阻塞其他线程,需要多处理版本。

如果您希望 dummy_thread 做一些真正的工作(比如收集数据进行存储),并且您希望 将数据从这里发送到 HDF5 序列化器,你将不得不进行某种消息传递——要么使用 multiprocessing.QueuePipe 或可能使用 ZeroMQ(例如 PUSH - PULL 套接字 一对)。使用 ZeroMQ,您甚至可以在另一台计算机上保存数据。

EDIT/WARNING: 提供的代码有时会无法保存数据,我做它是为了衡量性能,并没有做防水。在处理过程中按 Ctrl-C 时,有时我会得到损坏的文件。我认为这个问题超出了这个问题的范围(这个问题应该通过小心停止 运行ning 进程来解决)。