C 扩展中用于 IO 有界线程的 GIL (HDF5)
GIL for IO bounded thread in C extension (HDF5)
我有一个采样应用程序每秒获取 250,000
个样本,将它们缓冲在内存中并最终附加到 [=15] 提供的 HDFStore
=].总的来说,这很棒。但是,我有一个线程 运行s 并不断清空数据采集设备 (DAQ
),它需要定期 运行基础。大约一秒的偏差往往会破坏事物。以下是观察到的时间的极端情况。 Start
表示 DAQ
读取开始,Finish
表示读取结束,IO
表示 HDF 写入(DAQ
和 IO
都发生在单独的线程)。
Start : 2016-04-07 12:28:22.241303
IO (1) : 2016-04-07 12:28:22.241303
Finish : 2016-04-07 12:28:46.573440 (0.16 Hz, 24331.26 ms)
IO Done (1) : 2016-04-07 12:28:46.573440 (24332.39 ms)
如您所见,执行此写入需要 24 秒(典型写入约为 40 毫秒)。我写入的 HDD 没有负载,所以这个延迟不应该是由争用引起的(在 运行ning 时它的利用率约为 7%)。我在 HDFStore
写入时禁用了索引。我的应用程序 运行 有许多其他线程,所有这些线程都打印状态字符串,因此 IO 任务似乎阻塞了所有其他线程。我花了相当多的时间逐步检查代码以找出速度变慢的地方,它总是在 C 扩展提供的方法中,这导致了我的问题..
- 可以 Python(我使用的是 3.5)在 C 扩展中抢占执行吗? Concurrency: Are Python extensions written in C/C++ affected by the Global Interpreter Lock? 似乎表明除非扩展明确产生,否则它不会。
- Pandas' HDF5 C 代码是否实现了 I/O 的任何屈服?如果是这样,这是否意味着延迟是由于 CPU 有界任务造成的?我已禁用索引。
- 关于如何获得一致的计时有什么建议吗?我正在考虑将 HDF5 代码移到另一个进程中。不过,这只会在一定程度上有所帮助,因为无论如何我真的不能容忍大约 20 秒的写入,尤其是当它们不可预测时。
这是您可以运行查看问题的示例:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import time
def write_samples(store, samples, overwrite):
frame = pd.DataFrame(samples, dtype='float64')
if not overwrite:
store.append("df", frame, format='table', index=False)
else:
store.put("df", frame, format='table', index=False)
def begin_io():
store = pd.HDFStore("D:\slow\test" + str(random.randint(0,100)) + ".h5", mode='w', complevel=0)
counter = 0
while True:
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
print("IO Done : %s (%.2f ms, %d)" % (datetime.datetime.now(), (end_time - start_time) * 1000, counter))
counter += 1
store.close()
def dummy_thread():
previous = timer()
while True:
now = timer()
print("Dummy Thread : %s (%d ms)" % (datetime.datetime.now(), (now - previous) * 1000))
previous = now
time.sleep(0.01)
if __name__ == '__main__':
threading.Thread(target=dummy_thread).start()
begin_io()
您将获得类似于以下内容的输出:
IO Done : 2016-04-08 10:51:14.100479 (3.63 ms, 470)
Dummy Thread : 2016-04-08 10:51:14.101484 (12 ms)
IO Done : 2016-04-08 10:51:14.104475 (3.01 ms, 471)
Dummy Thread : 2016-04-08 10:51:14.576640 (475 ms)
IO Done : 2016-04-08 10:51:14.576640 (472.00 ms, 472)
Dummy Thread : 2016-04-08 10:51:14.897756 (321 ms)
IO Done : 2016-04-08 10:51:14.898782 (320.79 ms, 473)
IO Done : 2016-04-08 10:51:14.901772 (3.29 ms, 474)
IO Done : 2016-04-08 10:51:14.905773 (2.84 ms, 475)
IO Done : 2016-04-08 10:51:14.908775 (2.96 ms, 476)
Dummy Thread : 2016-04-08 10:51:14.909777 (11 ms)
答案是否定的,这些作者没有发布GIL。请参阅文档 here。我知道您实际上并没有尝试使用 多个 线程进行写入,但这应该会提示您。当写入碰巧真正防止多次写入时,会持有强锁。 PyTables
和 h5py
都将此作为 HDF5 标准的一部分。
你可以看看SWMR, though not directly supported by pandas. PyTables
docs here and here点解决办法。这些通常涉及有一个单独的进程从队列中拉出数据并写入它。
在任何情况下,这通常都是一种更具可扩展性的模式。
感谢您提供工作代码。我对其进行了修改以获得一些见解,后来又创建了
使用多处理的修改版本。
修改线程版本
所有修改只是为了获得更多信息,没有概念上的变化。合而为一
文件 mthread.py
并逐部分注释。
照常导入:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import logging
write_samples
得到了一些记录:
def write_samples(store, samples, overwrite):
wslog = logging.getLogger("write_samples")
wslog.info("starting")
frame = pd.DataFrame(samples, dtype='float64')
if overwrite:
store.put("df", frame, format='table', index=False)
else:
store.append("df", frame, format='table', index=False)
wslog.info("finished")
begin_io
达到最大持续时间,超过该时间会导致警告日志条目:
def begin_io(maxduration=500):
iolog = logging.getLogger("begin_io")
iolog.info("starting")
try:
fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
iolog.debug("opening store %s", fname)
with pd.HDFStore(fname, mode='w', complevel=0) as store:
iolog.debug("store %s open", fname)
counter = 0
while True:
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
duration = (end_time - start_time) * 1000
iolog.debug("IO Done : %s (%.2f ms, %d)",
datetime.datetime.now(),
duration,
counter)
if duration > maxduration:
iolog.warning("Long duration %s", duration)
counter += 1
except Exception:
iolog.exception("oops")
finally:
iolog.info("finished")
dummy_thread
已修改为正确停止并发出警告,如果时间太长:
def dummy_thread(pill2kill, maxduration=500):
dtlog = logging.getLogger("dummy_thread")
dtlog.info("starting")
try:
previous = timer()
while not pill2kill.wait(0.01):
now = timer()
duration = (now - previous) * 1000
dtlog.info("Dummy Thread : %s (%d ms)",
datetime.datetime.now(),
duration)
if duration > maxduration:
dtlog.warning("Long duration %s", duration)
previous = now
dtlog.debug("stopped looping.")
except Exception:
dtlog.exception("oops")
finally:
dtlog.info("finished")
最后我们称之为全部。随意修改日志级别,WARNING
显示过多的次数,
INFO
和 DEBUG
告诉我们更多。
if __name__ == '__main__':
logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
logging.basicConfig(format=logformat,
level=logging.WARNING)
pill2kill = threading.Event()
t = threading.Thread(target=dummy_thread, args=(pill2kill, 500))
t.start()
try:
begin_io(500)
finally:
pill2kill.set()
t.join()
运行 我得到的结果如您所述:
2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133
从值可以清楚地看出,虽然 begin_io
非常繁忙且延迟(可能在数据期间
写入磁盘),dummy_thread
也延迟了几乎相同的时间。
多处理版本 - 运行良好
我在多个进程中将代码修改为运行,从那以后,它真的不阻塞了
dummy_thread
.
2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744
多处理的代码在这里:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import multiprocessing
import time
import logging
def write_samples(store, samples, overwrite):
wslog = logging.getLogger("write_samples")
wslog.info("starting")
frame = pd.DataFrame(samples, dtype='float64')
if overwrite:
store.put("df", frame, format='table', index=False)
else:
store.append("df", frame, format='table', index=False)
wslog.info("finished")
def begin_io(pill2kill, maxduration=500):
iolog = logging.getLogger("begin_io")
iolog.info("starting")
try:
fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
iolog.debug("opening store %s", fname)
with pd.HDFStore(fname, mode='w', complevel=0) as store:
iolog.debug("store %s open", fname)
counter = 0
while not pill2kill.wait(0):
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
duration = (end_time - start_time) * 1000
iolog.debug( "IO Done : %s (%.2f ms, %d)",
datetime.datetime.now(),
duration,
counter)
if duration > maxduration:
iolog.warning("Long duration %s", duration)
counter += 1
except Exception:
iolog.exception("oops")
finally:
iolog.info("finished")
def dummy_thread(pill2kill, maxduration=500):
dtlog = logging.getLogger("dummy_thread")
dtlog.info("starting")
try:
previous = timer()
while not pill2kill.wait(0.01):
now = timer()
duration = (now - previous) * 1000
dtlog.info( "Dummy Thread : %s (%d ms)",
datetime.datetime.now(),
duration)
if duration > maxduration:
dtlog.warning("Long duration %s", duration)
previous = now
dtlog.debug("stopped looping.")
except Exception:
dtlog.exception("oops")
finally:
dtlog.info("finished")
if __name__ == '__main__':
logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
logging.basicConfig(format=logformat,
level=logging.WARNING)
pill2kill = multiprocessing.Event()
dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,))
dp.start()
try:
p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,))
p.start()
time.sleep(100)
finally:
pill2kill.set()
dp.join()
p.join()
结论
将数据写入 HDF5 文件确实会阻塞其他线程,需要多处理版本。
如果您希望 dummy_thread
做一些真正的工作(比如收集数据进行存储),并且您希望
将数据从这里发送到 HDF5 序列化器,你将不得不进行某种消息传递——要么使用
multiprocessing.Queue
、Pipe
或可能使用 ZeroMQ(例如 PUSH - PULL 套接字
一对)。使用 ZeroMQ,您甚至可以在另一台计算机上保存数据。
EDIT/WARNING: 提供的代码有时会无法保存数据,我做它是为了衡量性能,并没有做防水。在处理过程中按 Ctrl-C 时,有时我会得到损坏的文件。我认为这个问题超出了这个问题的范围(这个问题应该通过小心停止 运行ning 进程来解决)。
我有一个采样应用程序每秒获取 250,000
个样本,将它们缓冲在内存中并最终附加到 [=15] 提供的 HDFStore
=].总的来说,这很棒。但是,我有一个线程 运行s 并不断清空数据采集设备 (DAQ
),它需要定期 运行基础。大约一秒的偏差往往会破坏事物。以下是观察到的时间的极端情况。 Start
表示 DAQ
读取开始,Finish
表示读取结束,IO
表示 HDF 写入(DAQ
和 IO
都发生在单独的线程)。
Start : 2016-04-07 12:28:22.241303
IO (1) : 2016-04-07 12:28:22.241303
Finish : 2016-04-07 12:28:46.573440 (0.16 Hz, 24331.26 ms)
IO Done (1) : 2016-04-07 12:28:46.573440 (24332.39 ms)
如您所见,执行此写入需要 24 秒(典型写入约为 40 毫秒)。我写入的 HDD 没有负载,所以这个延迟不应该是由争用引起的(在 运行ning 时它的利用率约为 7%)。我在 HDFStore
写入时禁用了索引。我的应用程序 运行 有许多其他线程,所有这些线程都打印状态字符串,因此 IO 任务似乎阻塞了所有其他线程。我花了相当多的时间逐步检查代码以找出速度变慢的地方,它总是在 C 扩展提供的方法中,这导致了我的问题..
- 可以 Python(我使用的是 3.5)在 C 扩展中抢占执行吗? Concurrency: Are Python extensions written in C/C++ affected by the Global Interpreter Lock? 似乎表明除非扩展明确产生,否则它不会。
- Pandas' HDF5 C 代码是否实现了 I/O 的任何屈服?如果是这样,这是否意味着延迟是由于 CPU 有界任务造成的?我已禁用索引。
- 关于如何获得一致的计时有什么建议吗?我正在考虑将 HDF5 代码移到另一个进程中。不过,这只会在一定程度上有所帮助,因为无论如何我真的不能容忍大约 20 秒的写入,尤其是当它们不可预测时。
这是您可以运行查看问题的示例:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import time
def write_samples(store, samples, overwrite):
frame = pd.DataFrame(samples, dtype='float64')
if not overwrite:
store.append("df", frame, format='table', index=False)
else:
store.put("df", frame, format='table', index=False)
def begin_io():
store = pd.HDFStore("D:\slow\test" + str(random.randint(0,100)) + ".h5", mode='w', complevel=0)
counter = 0
while True:
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
print("IO Done : %s (%.2f ms, %d)" % (datetime.datetime.now(), (end_time - start_time) * 1000, counter))
counter += 1
store.close()
def dummy_thread():
previous = timer()
while True:
now = timer()
print("Dummy Thread : %s (%d ms)" % (datetime.datetime.now(), (now - previous) * 1000))
previous = now
time.sleep(0.01)
if __name__ == '__main__':
threading.Thread(target=dummy_thread).start()
begin_io()
您将获得类似于以下内容的输出:
IO Done : 2016-04-08 10:51:14.100479 (3.63 ms, 470)
Dummy Thread : 2016-04-08 10:51:14.101484 (12 ms)
IO Done : 2016-04-08 10:51:14.104475 (3.01 ms, 471)
Dummy Thread : 2016-04-08 10:51:14.576640 (475 ms)
IO Done : 2016-04-08 10:51:14.576640 (472.00 ms, 472)
Dummy Thread : 2016-04-08 10:51:14.897756 (321 ms)
IO Done : 2016-04-08 10:51:14.898782 (320.79 ms, 473)
IO Done : 2016-04-08 10:51:14.901772 (3.29 ms, 474)
IO Done : 2016-04-08 10:51:14.905773 (2.84 ms, 475)
IO Done : 2016-04-08 10:51:14.908775 (2.96 ms, 476)
Dummy Thread : 2016-04-08 10:51:14.909777 (11 ms)
答案是否定的,这些作者没有发布GIL。请参阅文档 here。我知道您实际上并没有尝试使用 多个 线程进行写入,但这应该会提示您。当写入碰巧真正防止多次写入时,会持有强锁。 PyTables
和 h5py
都将此作为 HDF5 标准的一部分。
你可以看看SWMR, though not directly supported by pandas. PyTables
docs here and here点解决办法。这些通常涉及有一个单独的进程从队列中拉出数据并写入它。
在任何情况下,这通常都是一种更具可扩展性的模式。
感谢您提供工作代码。我对其进行了修改以获得一些见解,后来又创建了 使用多处理的修改版本。
修改线程版本
所有修改只是为了获得更多信息,没有概念上的变化。合而为一
文件 mthread.py
并逐部分注释。
照常导入:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import threading
import logging
write_samples
得到了一些记录:
def write_samples(store, samples, overwrite):
wslog = logging.getLogger("write_samples")
wslog.info("starting")
frame = pd.DataFrame(samples, dtype='float64')
if overwrite:
store.put("df", frame, format='table', index=False)
else:
store.append("df", frame, format='table', index=False)
wslog.info("finished")
begin_io
达到最大持续时间,超过该时间会导致警告日志条目:
def begin_io(maxduration=500):
iolog = logging.getLogger("begin_io")
iolog.info("starting")
try:
fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
iolog.debug("opening store %s", fname)
with pd.HDFStore(fname, mode='w', complevel=0) as store:
iolog.debug("store %s open", fname)
counter = 0
while True:
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
duration = (end_time - start_time) * 1000
iolog.debug("IO Done : %s (%.2f ms, %d)",
datetime.datetime.now(),
duration,
counter)
if duration > maxduration:
iolog.warning("Long duration %s", duration)
counter += 1
except Exception:
iolog.exception("oops")
finally:
iolog.info("finished")
dummy_thread
已修改为正确停止并发出警告,如果时间太长:
def dummy_thread(pill2kill, maxduration=500):
dtlog = logging.getLogger("dummy_thread")
dtlog.info("starting")
try:
previous = timer()
while not pill2kill.wait(0.01):
now = timer()
duration = (now - previous) * 1000
dtlog.info("Dummy Thread : %s (%d ms)",
datetime.datetime.now(),
duration)
if duration > maxduration:
dtlog.warning("Long duration %s", duration)
previous = now
dtlog.debug("stopped looping.")
except Exception:
dtlog.exception("oops")
finally:
dtlog.info("finished")
最后我们称之为全部。随意修改日志级别,WARNING
显示过多的次数,
INFO
和 DEBUG
告诉我们更多。
if __name__ == '__main__':
logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
logging.basicConfig(format=logformat,
level=logging.WARNING)
pill2kill = threading.Event()
t = threading.Thread(target=dummy_thread, args=(pill2kill, 500))
t.start()
try:
begin_io(500)
finally:
pill2kill.set()
t.join()
运行 我得到的结果如您所述:
2016-04-08 15:29:11,428 [WARNING] - begin_io: Long duration 5169.03591156
2016-04-08 15:29:11,429 [WARNING] - dummy_thread: Long duration 5161.45706177
2016-04-08 15:29:27,305 [WARNING] - begin_io: Long duration 1447.40581512
2016-04-08 15:29:27,306 [WARNING] - dummy_thread: Long duration 1450.75201988
2016-04-08 15:29:32,893 [WARNING] - begin_io: Long duration 1610.98194122
2016-04-08 15:29:32,894 [WARNING] - dummy_thread: Long duration 1612.98394203
2016-04-08 15:29:34,930 [WARNING] - begin_io: Long duration 823.182821274
2016-04-08 15:29:34,930 [WARNING] - dummy_thread: Long duration 815.275907516
2016-04-08 15:29:43,640 [WARNING] - begin_io: Long duration 510.369062424
2016-04-08 15:29:43,640 [WARNING] - dummy_thread: Long duration 511.776924133
从值可以清楚地看出,虽然 begin_io
非常繁忙且延迟(可能在数据期间
写入磁盘),dummy_thread
也延迟了几乎相同的时间。
多处理版本 - 运行良好
我在多个进程中将代码修改为运行,从那以后,它真的不阻塞了
dummy_thread
.
2016-04-08 15:38:12,487 [WARNING] - begin_io: Long duration 755.397796631
2016-04-08 15:38:14,127 [WARNING] - begin_io: Long duration 1434.60512161
2016-04-08 15:38:15,725 [WARNING] - begin_io: Long duration 848.396062851
2016-04-08 15:38:24,290 [WARNING] - begin_io: Long duration 1129.17089462
2016-04-08 15:38:25,609 [WARNING] - begin_io: Long duration 1059.08918381
2016-04-08 15:38:31,165 [WARNING] - begin_io: Long duration 646.969079971
2016-04-08 15:38:37,273 [WARNING] - begin_io: Long duration 1699.17201996
2016-04-08 15:38:43,788 [WARNING] - begin_io: Long duration 1555.341959
2016-04-08 15:38:47,765 [WARNING] - begin_io: Long duration 639.196872711
2016-04-08 15:38:54,269 [WARNING] - begin_io: Long duration 1690.57011604
2016-04-08 15:39:06,397 [WARNING] - begin_io: Long duration 1998.33416939
2016-04-08 15:39:16,980 [WARNING] - begin_io: Long duration 2558.51006508
2016-04-08 15:39:21,688 [WARNING] - begin_io: Long duration 1132.73501396
2016-04-08 15:39:26,450 [WARNING] - begin_io: Long duration 876.784801483
2016-04-08 15:39:29,809 [WARNING] - begin_io: Long duration 709.135055542
2016-04-08 15:39:31,748 [WARNING] - begin_io: Long duration 677.506923676
2016-04-08 15:39:41,854 [WARNING] - begin_io: Long duration 770.184993744
多处理的代码在这里:
import pandas as pd
import numpy as np
from timeit import default_timer as timer
import datetime
import random
import multiprocessing
import time
import logging
def write_samples(store, samples, overwrite):
wslog = logging.getLogger("write_samples")
wslog.info("starting")
frame = pd.DataFrame(samples, dtype='float64')
if overwrite:
store.put("df", frame, format='table', index=False)
else:
store.append("df", frame, format='table', index=False)
wslog.info("finished")
def begin_io(pill2kill, maxduration=500):
iolog = logging.getLogger("begin_io")
iolog.info("starting")
try:
fname = "data/tab" + str(random.randint(0, 100)) + ".h5"
iolog.debug("opening store %s", fname)
with pd.HDFStore(fname, mode='w', complevel=0) as store:
iolog.debug("store %s open", fname)
counter = 0
while not pill2kill.wait(0):
data = np.random.rand(50000, 1)
start_time = timer()
write_samples(store, data, counter == 0)
end_time = timer()
duration = (end_time - start_time) * 1000
iolog.debug( "IO Done : %s (%.2f ms, %d)",
datetime.datetime.now(),
duration,
counter)
if duration > maxduration:
iolog.warning("Long duration %s", duration)
counter += 1
except Exception:
iolog.exception("oops")
finally:
iolog.info("finished")
def dummy_thread(pill2kill, maxduration=500):
dtlog = logging.getLogger("dummy_thread")
dtlog.info("starting")
try:
previous = timer()
while not pill2kill.wait(0.01):
now = timer()
duration = (now - previous) * 1000
dtlog.info( "Dummy Thread : %s (%d ms)",
datetime.datetime.now(),
duration)
if duration > maxduration:
dtlog.warning("Long duration %s", duration)
previous = now
dtlog.debug("stopped looping.")
except Exception:
dtlog.exception("oops")
finally:
dtlog.info("finished")
if __name__ == '__main__':
logformat = '%(asctime)-15s [%(levelname)s] - %(name)s: %(message)s'
logging.basicConfig(format=logformat,
level=logging.WARNING)
pill2kill = multiprocessing.Event()
dp = multiprocessing.Process(target=dummy_thread, args=(pill2kill, 500,))
dp.start()
try:
p = multiprocessing.Process(target=begin_io, args=(pill2kill, 500,))
p.start()
time.sleep(100)
finally:
pill2kill.set()
dp.join()
p.join()
结论
将数据写入 HDF5 文件确实会阻塞其他线程,需要多处理版本。
如果您希望 dummy_thread
做一些真正的工作(比如收集数据进行存储),并且您希望
将数据从这里发送到 HDF5 序列化器,你将不得不进行某种消息传递——要么使用
multiprocessing.Queue
、Pipe
或可能使用 ZeroMQ(例如 PUSH - PULL 套接字
一对)。使用 ZeroMQ,您甚至可以在另一台计算机上保存数据。
EDIT/WARNING: 提供的代码有时会无法保存数据,我做它是为了衡量性能,并没有做防水。在处理过程中按 Ctrl-C 时,有时我会得到损坏的文件。我认为这个问题超出了这个问题的范围(这个问题应该通过小心停止 运行ning 进程来解决)。